refactor(executors): 重构执行器策略为枚举类型并增强CLI功能

- 将 Strategy 从字符串字面量改为枚举类型,提供 SEQUENTIAL、THREAD 和 ASYNC 选项
- 添加策略归一化函数 _normalize_strategy,支持字符串和枚举类型的输入
- 重构 run 函数接受新的 Strategy 枚举类型,默认值改为 Strategy.SEQUENTIAL
- 添加 verbose 模式支持,在任务执行时打印生命周期信息
- 实现命令行运行器 CliRunner,提供命令行界面和参数解析功能
- 为 TaskSpec 添加 verbose 字段,控制子进程命令的详细输出
- 重构 pymake CLI 实现,使用新的命令行运行器架构
- 更新测试用例中的 depends_on 参数语法
This commit is contained in:
2026-06-20 17:20:05 +08:00
parent 6d4b5e4a1f
commit 13f6110b18
11 changed files with 986 additions and 349 deletions
+3 -2
View File
@@ -71,10 +71,10 @@ from .errors import (
TaskFailedError,
TaskTimeoutError,
)
from .executors import run
from .executors import Strategy, run
from .graph import Graph
from .report import RunReport
from .runner import CliExitCode, CliRunner
from .cli import CliExitCode, CliRunner
from .storage import JSONBackend, MemoryBackend, StateBackend
from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus
@@ -92,6 +92,7 @@ __all__ = [
"RunReport",
# 执行
"run",
"Strategy",
# CLI 运行器
"CliRunner",
"CliExitCode",
+5
View File
@@ -0,0 +1,5 @@
"""命令行运行器子包."""
from .runner import CliExitCode, CliRunner
__all__ = ["CliRunner", "CliExitCode"]
+281 -239
View File
@@ -82,270 +82,312 @@ def _build_graphs() -> dict[str, px.Graph]:
return {
# === 构建命令 ===
# 构建 Python 包
"b": px.Graph.from_specs([
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"b": px.Graph.from_specs(
[
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 构建 Rust 核心模块
"bc": px.Graph.from_specs([
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"bc": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 构建双包(先 Rust 后 Python
"ba": px.Graph.from_specs([
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("maturin_build",),
),
]),
"ba": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("maturin_build",),
),
]
),
# === 安装命令(开发模式) ===
# 安装 Rust 核心模块
"ic": px.Graph.from_specs([
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]),
"ic": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]
),
# 安装 Python 主包
"ip": px.Graph.from_specs([
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
),
]),
"ip": px.Graph.from_specs(
[
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
),
]
),
# 安装双包(开发模式)
"ia": px.Graph.from_specs([
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
depends_on=("maturin_dev",),
),
]),
"ia": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
depends_on=("maturin_dev",),
),
]
),
# === 清理命令 ===
# 清理 Python 构建产物
"cp": px.Graph.from_specs([
px.TaskSpec(
"git_clean_python",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]),
"cp": px.Graph.from_specs(
[
px.TaskSpec(
"git_clean_python",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]
),
# 清理 Rust 构建产物
"cc": px.Graph.from_specs([
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]),
"cc": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]
),
# 清理所有构建产物
"ca": px.Graph.from_specs([
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"git_clean",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]),
"ca": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"git_clean",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]
),
# === 开发工具 ===
# 运行测试, 跳过 slow, 并行模式
"t": px.Graph.from_specs([
px.TaskSpec(
"pytest",
cmd=[
"t": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
"-m",
"not slow",
"-n",
"8",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
cmd=[
"pytest",
"-m",
"not slow",
"-n",
"8",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试, 非并行模式
"tf": px.Graph.from_specs([
px.TaskSpec(
"pytest",
cmd=[
"tf": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
"-m",
"not slow",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
cmd=[
"pytest",
"-m",
"not slow",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试并生成覆盖率报告, 跳过 slow, 并行模式
"tc": px.Graph.from_specs([
px.TaskSpec(
"pytest_cov",
cmd=[
"pytest",
"-m",
"not slow",
"--cov",
"-n",
"auto",
"--dist",
"loadfile",
"--tb=short",
"-v",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"tc": px.Graph.from_specs(
[
px.TaskSpec(
"pytest_cov",
cmd=[
"pytest",
"-m",
"not slow",
"--cov",
"-n",
"auto",
"--dist",
"loadfile",
"--tb=short",
"-v",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 代码格式化与检查
"lint": px.Graph.from_specs([
px.TaskSpec(
"ruff_check",
cmd=[
"ruff",
"check",
"--fix",
"--unsafe-fixes",
],
conditions=(_RUFF_CONDITION,),
timeout=conf.TIMEOUT,
cwd=Path(conf.PROJECT_ROOT),
),
]),
"lint": px.Graph.from_specs(
[
px.TaskSpec(
"ruff_check",
cmd=[
"ruff",
"check",
"--fix",
"--unsafe-fixes",
],
conditions=(_RUFF_CONDITION,),
timeout=conf.TIMEOUT,
cwd=Path(conf.PROJECT_ROOT),
),
]
),
# 类型检查
"typecheck": px.Graph.from_specs([
px.TaskSpec(
"ty_check",
cmd=["ty", "check", "src/bitool"],
conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),),
),
]),
"typecheck": px.Graph.from_specs(
[
px.TaskSpec(
"ty_check",
cmd=["ty", "check", "src/bitool"],
conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),),
),
]
),
# 构建文档
"doc": px.Graph.from_specs([
px.TaskSpec(
"sphinx_build",
cmd=conf.DOC_BUILD_COMMAND,
conditions=(BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),),
),
]),
"doc": px.Graph.from_specs(
[
px.TaskSpec(
"sphinx_build",
cmd=conf.DOC_BUILD_COMMAND,
conditions=(
BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),
),
),
]
),
# === 发布命令 ===
# 发布 Python 主包到 PyPI
"pb": px.Graph.from_specs([
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"pb": px.Graph.from_specs(
[
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 发布所有包(先 Rust 后 Python
"pba": px.Graph.from_specs([
px.TaskSpec(
"publish_rust",
cmd=[
"twine",
"upload",
"--disable-progress-bar",
conf.CORE_PATTERN,
],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("publish_rust",),
),
]),
"pba": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=[
"twine",
"upload",
"--disable-progress-bar",
conf.CORE_PATTERN,
],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("publish_rust",),
),
]
),
# 发布 Rust 核心模块 (maturin publish)
"pbc": px.Graph.from_specs([
px.TaskSpec(
"publish_rust",
cmd=["maturin", "publish"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"pbc": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=["maturin", "publish"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# === 多版本测试命令 ===
# 运行多版本 Python 测试 (tox)
"tox": px.Graph.from_specs([
px.TaskSpec(
"tox_run",
cmd=["tox", "-p", "auto"],
conditions=(_TOX_CONDITION,),
timeout=conf.TIMEOUT,
),
]),
"tox": px.Graph.from_specs(
[
px.TaskSpec(
"tox_run",
cmd=["tox", "-p", "auto"],
conditions=(_TOX_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 安装多版本 Python (仅安装不测试)
"tox-install": px.Graph.from_specs([
px.TaskSpec(
"uv_python_install",
cmd=[
"uv",
"python",
"install",
"3.8",
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
],
conditions=(_UV_CONDITION,),
timeout=600,
),
]),
"tox-install": px.Graph.from_specs(
[
px.TaskSpec(
"uv_python_install",
cmd=[
"uv",
"python",
"install",
"3.8",
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
],
conditions=(_UV_CONDITION,),
timeout=600,
),
]
),
}
@@ -405,6 +447,6 @@ def main():
runner = px.CliRunner(
strategy=px.Strategy.SEQUENTIAL,
description="PyMake - Python 构建工具 (替代 Makefile)",
graphs=**_build_graphs(),
**_build_graphs(),
)
runner.run_cli()
+300
View File
@@ -0,0 +1,300 @@
"""命令行运行器:根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例,
通过 argparse 解析用户输入的命令并执行对应的图.
与 bitool_skill.MapSkill 的区别:
- MapSkill 通过继承 + create_scheduler_map 构建命令映射
- CliRunner 通过关键字参数直接注入命令到图的映射, 更声明式
- CliRunner 复用 pyflowx 的 DAG 调度能力 (run/Graph/TaskSpec)
verbose 模式
------------
``CliRunner`` 默认 ``verbose=True``, 会:
1. 打印任务生命周期 (开始/成功/失败/跳过) 到 stdout
2. 对 ``cmd`` 类任务, 显示执行的命令及其标准输出/标准错误
可通过构造参数 ``verbose=False`` 或命令行 ``--quiet`` 关闭.
"""
from __future__ import annotations
import argparse
import dataclasses
import enum
import sys
from typing import Dict, List, Optional, Sequence, Union
from ..errors import PyFlowXError
from ..executors import Strategy, _normalize_strategy, run
from ..graph import Graph
__all__ = ["CliRunner", "CliExitCode"]
class CliExitCode(enum.IntEnum):
"""CliRunner 退出码."""
SUCCESS = 0
FAILURE = 1
INTERRUPTED = 130 # 与 POSIX 信号中断一致
def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
"""创建新图, 其中所有 TaskSpec 的 verbose 字段被设置为指定值.
使用 ``dataclasses.replace`` 在不可变的 TaskSpec 上创建带 verbose 标记的副本.
依赖关系、标签等元数据全部保留.
Parameters
----------
graph : Graph
原始图.
verbose : bool
要设置的 verbose 值.
Returns
-------
Graph
所有 spec 的 verbose 字段已更新的新图.
"""
new_specs = []
for spec in graph.all_specs().values():
if spec.verbose == verbose:
new_specs.append(spec)
else:
new_specs.append(dataclasses.replace(spec, verbose=verbose))
return Graph.from_specs(new_specs)
class CliRunner:
"""命令行运行器: 根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例.
通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图.
Parameters
----------
strategy : str | Strategy
默认执行策略 (``Strategy.SEQUENTIAL`` / ``Strategy.THREAD`` /
``Strategy.ASYNC`` 或对应字符串). 可被命令行 ``--strategy`` 覆盖.
description : str
CLI 描述文本, 显示在 ``--help`` 中.
verbose : bool
是否显示详细执行过程. ``True`` 时打印任务生命周期和 subprocess 输出.
默认 ``True``. 可被命令行 ``--quiet`` 关闭.
**graphs : Graph
命令名到图的映射. 每个 key 是一个命令名, value 是对应的
:class:`~pyflowx.graph.Graph`.
Examples
--------
基本用法::
runner = px.CliRunner(
clean=px.Graph.from_specs([
px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]),
]),
build=px.Graph.from_specs([
px.TaskSpec("uv_build", cmd=["uv", "build"]),
]),
)
runner.run() # 解析 sys.argv
指定策略与描述::
runner = px.CliRunner(
strategy=px.Strategy.THREAD,
description="My build tool",
test=px.Graph.from_specs([...]),
)
runner.run(["test", "--strategy", "sequential"])
"""
def __init__(
self,
*,
strategy: Union[str, Strategy] = Strategy.SEQUENTIAL,
description: str = "",
verbose: bool = True,
**graphs: Graph,
) -> None:
if not graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 校验所有值都是 Graph
for name, graph in graphs.items():
if not isinstance(graph, Graph):
raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}")
self._graphs: Dict[str, Graph] = dict(graphs)
self._strategy: Strategy = _normalize_strategy(strategy)
self._description: str = description
self._verbose: bool = verbose
# ------------------------------------------------------------------ #
# 内省
# ------------------------------------------------------------------ #
@property
def commands(self) -> List[str]:
"""可用的命令列表 (按插入顺序)."""
return list(self._graphs.keys())
@property
def graphs(self) -> Dict[str, Graph]:
"""命令名到图的映射 (只读副本)."""
return dict(self._graphs)
@property
def strategy(self) -> Strategy:
"""默认执行策略."""
return self._strategy
@property
def description(self) -> str:
"""CLI 描述文本."""
return self._description
@property
def verbose(self) -> bool:
"""是否显示详细执行过程."""
return self._verbose
# ------------------------------------------------------------------ #
# 参数解析
# ------------------------------------------------------------------ #
def _prog_name(self) -> str:
"""从 sys.argv[0] 推导程序名."""
import os
return os.path.basename(sys.argv[0]) if sys.argv else "pyflowx"
def create_parser(self) -> argparse.ArgumentParser:
"""创建参数解析器.
子类可覆盖此方法以添加自定义参数. 覆盖时应保留 ``command``
位置参数与 ``--strategy`` / ``--dry-run`` / ``--list`` / ``--quiet``
选项, 否则 :meth:`run` 的默认逻辑可能失效.
Returns
-------
argparse.ArgumentParser
新创建的参数解析器实例.
"""
parser = argparse.ArgumentParser(
prog=self._prog_name(),
description=self._description or "PyFlowX CLI Runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=self._format_commands_help(),
)
parser.add_argument(
"command",
nargs="?",
help="要执行的命令",
)
parser.add_argument(
"--strategy",
choices=[s.value for s in Strategy],
default=self._strategy.value,
help="执行策略 (默认: %(default)s)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="只打印执行计划, 不实际运行",
)
parser.add_argument(
"--list",
action="store_true",
help="列出所有可用命令",
)
parser.add_argument(
"--quiet",
action="store_true",
help="静默模式, 不显示执行过程 (覆盖默认 verbose)",
)
return parser
def _format_commands_help(self) -> str:
"""格式化命令帮助文本."""
lines = ["可用命令:"]
for cmd in self._graphs:
lines.append(f" {cmd}")
return "\n".join(lines)
# ------------------------------------------------------------------ #
# 执行
# ------------------------------------------------------------------ #
def run(self, args: Optional[Sequence[str]] = None) -> int:
"""解析参数并执行对应的图.
Parameters
----------
args : Sequence[str] | None
参数列表, 默认使用 ``sys.argv[1:]``.
Returns
-------
int
退出码 (0 成功, 1 失败, 130 中断).
Raises
------
SystemExit
当 argparse 无法解析参数时 (与标准 argparse 行为一致).
"""
parser = self.create_parser()
parsed = parser.parse_args(args)
# --list: 列出命令
if parsed.list:
print(self._format_commands_help())
return CliExitCode.SUCCESS.value
# 无命令: 显示帮助
if not parsed.command:
parser.print_help()
return CliExitCode.FAILURE.value
# 验证命令
if parsed.command not in self._graphs:
available = ", ".join(self._graphs.keys())
print(
f"错误: 未知命令 {parsed.command!r} (可用命令: {available})",
file=sys.stderr,
)
return CliExitCode.FAILURE.value
# 确定是否 verbose: --quiet 覆盖默认值
verbose = self._verbose and not parsed.quiet
# 对图应用 verbose 设置 (重建带 verbose 标记的 spec)
graph = self._graphs[parsed.command]
if verbose:
graph = _apply_verbose_to_graph(graph, verbose=True)
# 执行对应的图
try:
report = run(
graph,
strategy=parsed.strategy,
dry_run=parsed.dry_run,
verbose=verbose,
)
return CliExitCode.SUCCESS.value if report.success else CliExitCode.FAILURE.value
except KeyboardInterrupt:
print("\n操作已取消", file=sys.stderr)
return CliExitCode.INTERRUPTED.value
except PyFlowXError as e:
print(f"错误: {e}", file=sys.stderr)
return CliExitCode.FAILURE.value
def run_cli(self, args: Optional[Sequence[str]] = None) -> None:
"""运行并以退出码退出进程.
作为 CLI 工具运行时的入口点, 等价于 ``sys.exit(self.run(args))``.
Parameters
----------
args : Sequence[str] | None
参数列表, 默认使用 ``sys.argv[1:]``.
"""
sys.exit(self.run(args))
+103 -12
View File
@@ -16,10 +16,11 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import enum
import inspect
import logging
from datetime import datetime
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, cast
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Union, cast
from .context import build_call_args, describe_injection
from .errors import TaskFailedError, TaskTimeoutError
@@ -33,8 +34,53 @@ logger = logging.getLogger("pyflowx")
# 观察者回调类型。
EventCallback = Callable[[TaskEvent], None]
# 策略选择字面量。
Strategy = str # "sequential" | "thread" | "async"
class Strategy(enum.Enum):
"""任务图执行策略.
Members
-------
SEQUENTIAL
顺序执行: 逐个运行任务, 确定性最高, 适合调试.
THREAD
线程池执行: 层内任务通过线程池并发, 适合 I/O 密集型同步任务.
ASYNC
异步执行: 通过 ``asyncio.gather`` 实现层内并发, 适合 I/O 密集型异步任务.
"""
SEQUENTIAL = "sequential"
THREAD = "thread"
ASYNC = "async"
def _normalize_strategy(strategy: Union[str, Strategy]) -> Strategy:
"""将字符串或 Strategy 归一化为 Strategy 枚举.
Parameters
----------
strategy : str | Strategy
策略值, 接受字符串 (``"sequential"`` / ``"thread"`` / ``"async"``)
或 :class:`Strategy` 枚举成员.
Returns
-------
Strategy
归一化后的枚举成员.
Raises
------
ValueError
策略不被识别时.
"""
if isinstance(strategy, Strategy):
return strategy
if isinstance(strategy, str):
try:
return Strategy(strategy)
except ValueError:
valid = ", ".join(repr(s.value) for s in Strategy)
raise ValueError(f"unknown strategy {strategy!r}; expected one of {valid}.") from None
raise TypeError(f"strategy must be str or Strategy, got {type(strategy).__name__}")
def _is_async_fn(spec: TaskSpec[object]) -> bool:
@@ -299,12 +345,51 @@ async def _execute_layer_async(
# ---------------------------------------------------------------------- #
# 公共 API
# ---------------------------------------------------------------------- #
def _make_verbose_callback(
on_event: Optional[EventCallback],
) -> Optional[EventCallback]:
"""包装 on_event 回调, 在 verbose 模式下打印任务生命周期.
Parameters
----------
on_event : EventCallback | None
用户提供的原始回调, 若为 None 则仅打印.
Returns
-------
EventCallback | None
包装后的回调.
"""
def _verbose_callback(event: TaskEvent) -> None:
# 先打印生命周期信息
dur = f" ({event.duration:.3f}s)" if event.duration is not None else ""
if event.status == TaskStatus.RUNNING:
print(f"[verbose] 任务 {event.task!r} 开始执行...", flush=True)
elif event.status == TaskStatus.SUCCESS:
print(f"[verbose] 任务 {event.task!r} 成功{dur}", flush=True)
elif event.status == TaskStatus.FAILED:
err = f": {event.error}" if event.error else ""
print(
f"[verbose] 任务 {event.task!r} 失败{dur} (尝试 {event.attempts} 次){err}",
flush=True,
)
elif event.status == TaskStatus.SKIPPED:
print(f"[verbose] 任务 {event.task!r} 跳过", flush=True)
# 再调用用户回调
if on_event is not None:
on_event(event)
return _verbose_callback
def run(
graph: Graph,
strategy: Strategy = "sequential",
strategy: Union[str, Strategy] = Strategy.SEQUENTIAL,
*,
max_workers: Optional[int] = None,
dry_run: bool = False,
verbose: bool = False,
on_event: Optional[EventCallback] = None,
state: Optional[StateBackend] = None,
) -> RunReport:
@@ -315,12 +400,16 @@ def run(
graph:
待执行的已校验 :class:`Graph`。
strategy:
``"sequential"``(默认)、``"thread"`` 或 ``"async"``。
执行策略, 接受 :class:`Strategy` 枚举成员或字符串
(``"sequential"`` / ``"thread"`` / ``"async"``). 默认 ``Strategy.SEQUENTIAL``.
max_workers:
``"thread"`` 的线程池大小。默认 ``min(32, len(layer))``。
dry_run:
若为 ``True``,打印执行计划(层 + 注入)并返回空报告,不执行
任何任务。
verbose:
若为 ``True``, 打印任务生命周期 (开始/成功/失败/跳过) 到 stdout.
注意: subprocess 命令的输出由 :class:`TaskSpec` 的 ``verbose`` 字段控制.
on_event:
可选回调,在每次状态转换时调用。
state:
@@ -335,8 +424,7 @@ def run(
任何任务耗尽重试后仍失败时。运行在失败层中止;后续层的任务
不会被执行。
"""
if strategy not in ("sequential", "thread", "async"):
raise ValueError(f"unknown strategy {strategy!r}; expected 'sequential', 'thread', or 'async'.")
normalized = _normalize_strategy(strategy)
graph.validate()
layers = graph.layers()
@@ -345,17 +433,20 @@ def run(
_print_dry_run(graph, layers)
return RunReport(success=True)
# verbose 模式下包装事件回调
effective_callback: Optional[EventCallback] = _make_verbose_callback(on_event) if verbose else on_event
backend = resolve_backend(state)
report = RunReport()
context: Dict[str, Any] = {}
try:
if strategy == "sequential":
_drive_sequential(graph, layers, context, report, backend, on_event)
elif strategy == "thread":
_drive_threaded(graph, layers, context, report, backend, on_event, max_workers)
if normalized == Strategy.SEQUENTIAL:
_drive_sequential(graph, layers, context, report, backend, effective_callback)
elif normalized == Strategy.THREAD:
_drive_threaded(graph, layers, context, report, backend, effective_callback, max_workers)
else:
_drive_async(graph, layers, context, report, backend, on_event)
_drive_async(graph, layers, context, report, backend, effective_callback)
except TaskFailedError:
report.success = False
raise
+27 -5
View File
@@ -107,6 +107,10 @@ class TaskSpec(Generic[T]):
cwd:
命令执行的工作目录,仅在使用 ``cmd`` 参数时有效。
``None`` 表示当前目录。
verbose:
是否在命令执行时显示详细输出。``True`` 时会打印执行的命令
及其标准输出/标准错误。仅在使用 ``cmd`` 参数时有效。
``False`` 时静默捕获输出(失败时仍会包含在错误信息中)。
"""
name: str
@@ -120,6 +124,7 @@ class TaskSpec(Generic[T]):
tags: Tuple[str, ...] = ()
conditions: Tuple[Condition, ...] = ()
cwd: Optional[Path] = None
verbose: bool = False
def __post_init__(self) -> None:
if not self.name:
@@ -157,6 +162,7 @@ class TaskSpec(Generic[T]):
cmd = self.cmd
cwd = self.cwd
timeout = self.timeout
verbose = self.verbose
if isinstance(cmd, list):
@@ -164,12 +170,16 @@ class TaskSpec(Generic[T]):
import subprocess
cmd_str = " ".join(str(arg) for arg in cmd)
if verbose:
print(f"[verbose] 执行命令: {cmd_str}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
result = subprocess.run(
cmd,
cwd=cwd,
timeout=timeout,
capture_output=True,
capture_output=not verbose,
text=True,
check=False,
)
@@ -180,11 +190,14 @@ class TaskSpec(Generic[T]):
except OSError as e:
raise RuntimeError(f"命令执行异常: {cmd_str}: {e}")
if verbose:
print(f"[verbose] 返回码: {result.returncode}", flush=True)
if result.returncode == 0:
return None # type: ignore[return-value]
err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}"
if result.stderr.strip():
if not verbose and result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
@@ -196,13 +209,17 @@ class TaskSpec(Generic[T]):
def _run_shell() -> T:
import subprocess
if verbose:
print(f"[verbose] 执行 Shell: {cmd}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
timeout=timeout,
capture_output=True,
capture_output=not verbose,
text=True,
check=False,
)
@@ -213,11 +230,14 @@ class TaskSpec(Generic[T]):
except OSError as e:
raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}")
if verbose:
print(f"[verbose] 返回码: {result.returncode}", flush=True)
if result.returncode == 0:
return None # type: ignore[return-value]
err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}"
if result.stderr.strip():
if not verbose and result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
@@ -227,7 +247,9 @@ class TaskSpec(Generic[T]):
if callable(cmd):
return cmd # type: ignore[return-value]
raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}")
raise TypeError(
f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}"
)
def should_execute(self) -> bool:
"""检查任务是否应该执行.