From 13f6110b18548ae5236829d8cdfea744ae25f246 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sat, 20 Jun 2026 17:20:05 +0800 Subject: [PATCH] =?UTF-8?q?refactor(executors):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E7=AD=96=E7=95=A5=E4=B8=BA=E6=9E=9A?= =?UTF-8?q?=E4=B8=BE=E7=B1=BB=E5=9E=8B=E5=B9=B6=E5=A2=9E=E5=BC=BACLI?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将 Strategy 从字符串字面量改为枚举类型,提供 SEQUENTIAL、THREAD 和 ASYNC 选项 - 添加策略归一化函数 _normalize_strategy,支持字符串和枚举类型的输入 - 重构 run 函数接受新的 Strategy 枚举类型,默认值改为 Strategy.SEQUENTIAL - 添加 verbose 模式支持,在任务执行时打印生命周期信息 - 实现命令行运行器 CliRunner,提供命令行界面和参数解析功能 - 为 TaskSpec 添加 verbose 字段,控制子进程命令的详细输出 - 重构 pymake CLI 实现,使用新的命令行运行器架构 - 更新测试用例中的 depends_on 参数语法 --- .gitignore | 1 + src/pyflowx/__init__.py | 5 +- src/pyflowx/cli/__init__.py | 5 + src/pyflowx/cli/pymake.py | 520 +++++++++++++++++++----------------- src/pyflowx/cli/runner.py | 300 +++++++++++++++++++++ src/pyflowx/executors.py | 115 +++++++- src/pyflowx/task.py | 32 ++- tests/test_context.py | 22 +- tests/test_executors.py | 22 +- tests/test_graph.py | 36 +-- tests/test_runner.py | 277 +++++++++++++++---- 11 files changed, 986 insertions(+), 349 deletions(-) create mode 100644 src/pyflowx/cli/runner.py diff --git a/.gitignore b/.gitignore index 69f8916..8ad8793 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ wheels/ # Virtual environments .venv .coverage +.idea diff --git a/src/pyflowx/__init__.py b/src/pyflowx/__init__.py index d20b4f4..48289a1 100644 --- a/src/pyflowx/__init__.py +++ b/src/pyflowx/__init__.py @@ -71,10 +71,10 @@ from .errors import ( TaskFailedError, TaskTimeoutError, ) -from .executors import run +from .executors import Strategy, run from .graph import Graph from .report import RunReport -from .runner import CliExitCode, CliRunner +from .cli import CliExitCode, CliRunner from .storage import JSONBackend, MemoryBackend, StateBackend from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus @@ -92,6 +92,7 @@ __all__ = [ "RunReport", # 执行 "run", + "Strategy", # CLI 运行器 "CliRunner", "CliExitCode", diff --git a/src/pyflowx/cli/__init__.py b/src/pyflowx/cli/__init__.py index e69de29..a24a5da 100644 --- a/src/pyflowx/cli/__init__.py +++ b/src/pyflowx/cli/__init__.py @@ -0,0 +1,5 @@ +"""命令行运行器子包.""" + +from .runner import CliExitCode, CliRunner + +__all__ = ["CliRunner", "CliExitCode"] diff --git a/src/pyflowx/cli/pymake.py b/src/pyflowx/cli/pymake.py index 93bbc89..9c60bc8 100644 --- a/src/pyflowx/cli/pymake.py +++ b/src/pyflowx/cli/pymake.py @@ -82,270 +82,312 @@ def _build_graphs() -> dict[str, px.Graph]: return { # === 构建命令 === # 构建 Python 包 - "b": px.Graph.from_specs([ - px.TaskSpec( - "uv_build", - cmd=conf.BUILD_COMMAND, - conditions=(_UV_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "b": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_build", + cmd=conf.BUILD_COMMAND, + conditions=(_UV_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 构建 Rust 核心模块 - "bc": px.Graph.from_specs([ - px.TaskSpec( - "maturin_build", - cmd=_get_maturin_build_command(), - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "bc": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_build", + cmd=_get_maturin_build_command(), + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 构建双包(先 Rust 后 Python) - "ba": px.Graph.from_specs([ - px.TaskSpec( - "maturin_build", - cmd=_get_maturin_build_command(), - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - timeout=conf.TIMEOUT, - ), - px.TaskSpec( - "uv_build", - cmd=conf.BUILD_COMMAND, - conditions=(_UV_CONDITION,), - timeout=conf.TIMEOUT, - depends_on=("maturin_build",), - ), - ]), + "ba": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_build", + cmd=_get_maturin_build_command(), + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + px.TaskSpec( + "uv_build", + cmd=conf.BUILD_COMMAND, + conditions=(_UV_CONDITION,), + timeout=conf.TIMEOUT, + depends_on=("maturin_build",), + ), + ] + ), # === 安装命令(开发模式) === # 安装 Rust 核心模块 - "ic": px.Graph.from_specs([ - px.TaskSpec( - "maturin_dev", - cmd=conf.MATURIN_DEV_COMMAND, - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - ), - ]), + "ic": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_dev", + cmd=conf.MATURIN_DEV_COMMAND, + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + ] + ), # 安装 Python 主包 - "ip": px.Graph.from_specs([ - px.TaskSpec( - "uv_install", - cmd=["uv", "pip", "install", "-e", "."], - conditions=(_UV_CONDITION,), - ), - ]), + "ip": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_install", + cmd=["uv", "pip", "install", "-e", "."], + conditions=(_UV_CONDITION,), + ), + ] + ), # 安装双包(开发模式) - "ia": px.Graph.from_specs([ - px.TaskSpec( - "maturin_dev", - cmd=conf.MATURIN_DEV_COMMAND, - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - ), - px.TaskSpec( - "uv_install", - cmd=["uv", "pip", "install", "-e", "."], - conditions=(_UV_CONDITION,), - depends_on=("maturin_dev",), - ), - ]), + "ia": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_dev", + cmd=conf.MATURIN_DEV_COMMAND, + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + px.TaskSpec( + "uv_install", + cmd=["uv", "pip", "install", "-e", "."], + conditions=(_UV_CONDITION,), + depends_on=("maturin_dev",), + ), + ] + ), # === 清理命令 === # 清理 Python 构建产物 - "cp": px.Graph.from_specs([ - px.TaskSpec( - "git_clean_python", - cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], - conditions=(_GIT_CONDITION,), - ), - ]), + "cp": px.Graph.from_specs( + [ + px.TaskSpec( + "git_clean_python", + cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], + conditions=(_GIT_CONDITION,), + ), + ] + ), # 清理 Rust 构建产物 - "cc": px.Graph.from_specs([ - px.TaskSpec( - "cargo_clean", - cmd=["cargo", "clean"], - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - ), - ]), + "cc": px.Graph.from_specs( + [ + px.TaskSpec( + "cargo_clean", + cmd=["cargo", "clean"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + ] + ), # 清理所有构建产物 - "ca": px.Graph.from_specs([ - px.TaskSpec( - "cargo_clean", - cmd=["cargo", "clean"], - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - ), - px.TaskSpec( - "git_clean", - cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], - conditions=(_GIT_CONDITION,), - ), - ]), + "ca": px.Graph.from_specs( + [ + px.TaskSpec( + "cargo_clean", + cmd=["cargo", "clean"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + px.TaskSpec( + "git_clean", + cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], + conditions=(_GIT_CONDITION,), + ), + ] + ), # === 开发工具 === # 运行测试, 跳过 slow, 并行模式 - "t": px.Graph.from_specs([ - px.TaskSpec( - "pytest", - cmd=[ + "t": px.Graph.from_specs( + [ + px.TaskSpec( "pytest", - "-m", - "not slow", - "-n", - "8", - "--dist", - "loadfile", - "--color=yes", - "--durations=10", - ], - conditions=(_PYTEST_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + cmd=[ + "pytest", + "-m", + "not slow", + "-n", + "8", + "--dist", + "loadfile", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 运行测试, 非并行模式 - "tf": px.Graph.from_specs([ - px.TaskSpec( - "pytest", - cmd=[ + "tf": px.Graph.from_specs( + [ + px.TaskSpec( "pytest", - "-m", - "not slow", - "--dist", - "loadfile", - "--color=yes", - "--durations=10", - ], - conditions=(_PYTEST_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + cmd=[ + "pytest", + "-m", + "not slow", + "--dist", + "loadfile", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式 - "tc": px.Graph.from_specs([ - px.TaskSpec( - "pytest_cov", - cmd=[ - "pytest", - "-m", - "not slow", - "--cov", - "-n", - "auto", - "--dist", - "loadfile", - "--tb=short", - "-v", - "--color=yes", - "--durations=10", - ], - conditions=(_PYTEST_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "tc": px.Graph.from_specs( + [ + px.TaskSpec( + "pytest_cov", + cmd=[ + "pytest", + "-m", + "not slow", + "--cov", + "-n", + "auto", + "--dist", + "loadfile", + "--tb=short", + "-v", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 代码格式化与检查 - "lint": px.Graph.from_specs([ - px.TaskSpec( - "ruff_check", - cmd=[ - "ruff", - "check", - "--fix", - "--unsafe-fixes", - ], - conditions=(_RUFF_CONDITION,), - timeout=conf.TIMEOUT, - cwd=Path(conf.PROJECT_ROOT), - ), - ]), + "lint": px.Graph.from_specs( + [ + px.TaskSpec( + "ruff_check", + cmd=[ + "ruff", + "check", + "--fix", + "--unsafe-fixes", + ], + conditions=(_RUFF_CONDITION,), + timeout=conf.TIMEOUT, + cwd=Path(conf.PROJECT_ROOT), + ), + ] + ), # 类型检查 - "typecheck": px.Graph.from_specs([ - px.TaskSpec( - "ty_check", - cmd=["ty", "check", "src/bitool"], - conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),), - ), - ]), + "typecheck": px.Graph.from_specs( + [ + px.TaskSpec( + "ty_check", + cmd=["ty", "check", "src/bitool"], + conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),), + ), + ] + ), # 构建文档 - "doc": px.Graph.from_specs([ - px.TaskSpec( - "sphinx_build", - cmd=conf.DOC_BUILD_COMMAND, - conditions=(BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),), - ), - ]), + "doc": px.Graph.from_specs( + [ + px.TaskSpec( + "sphinx_build", + cmd=conf.DOC_BUILD_COMMAND, + conditions=( + BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL), + ), + ), + ] + ), # === 发布命令 === # 发布 Python 主包到 PyPI - "pb": px.Graph.from_specs([ - px.TaskSpec( - "publish_python", - cmd=["hatch", "publish"], - cwd=Path(conf.PROJECT_ROOT), - conditions=(_HATCH_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "pb": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_python", + cmd=["hatch", "publish"], + cwd=Path(conf.PROJECT_ROOT), + conditions=(_HATCH_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 发布所有包(先 Rust 后 Python) - "pba": px.Graph.from_specs([ - px.TaskSpec( - "publish_rust", - cmd=[ - "twine", - "upload", - "--disable-progress-bar", - conf.CORE_PATTERN, - ], - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - timeout=conf.TIMEOUT, - ), - px.TaskSpec( - "publish_python", - cmd=["hatch", "publish"], - cwd=Path(conf.PROJECT_ROOT), - conditions=(_HATCH_CONDITION,), - timeout=conf.TIMEOUT, - depends_on=("publish_rust",), - ), - ]), + "pba": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_rust", + cmd=[ + "twine", + "upload", + "--disable-progress-bar", + conf.CORE_PATTERN, + ], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + px.TaskSpec( + "publish_python", + cmd=["hatch", "publish"], + cwd=Path(conf.PROJECT_ROOT), + conditions=(_HATCH_CONDITION,), + timeout=conf.TIMEOUT, + depends_on=("publish_rust",), + ), + ] + ), # 发布 Rust 核心模块 (maturin publish) - "pbc": px.Graph.from_specs([ - px.TaskSpec( - "publish_rust", - cmd=["maturin", "publish"], - cwd=Path(conf.CORE_DIR), - conditions=(_MATURIN_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "pbc": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_rust", + cmd=["maturin", "publish"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # === 多版本测试命令 === # 运行多版本 Python 测试 (tox) - "tox": px.Graph.from_specs([ - px.TaskSpec( - "tox_run", - cmd=["tox", "-p", "auto"], - conditions=(_TOX_CONDITION,), - timeout=conf.TIMEOUT, - ), - ]), + "tox": px.Graph.from_specs( + [ + px.TaskSpec( + "tox_run", + cmd=["tox", "-p", "auto"], + conditions=(_TOX_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), # 安装多版本 Python (仅安装不测试) - "tox-install": px.Graph.from_specs([ - px.TaskSpec( - "uv_python_install", - cmd=[ - "uv", - "python", - "install", - "3.8", - "3.9", - "3.10", - "3.11", - "3.12", - "3.13", - "3.14", - ], - conditions=(_UV_CONDITION,), - timeout=600, - ), - ]), + "tox-install": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_python_install", + cmd=[ + "uv", + "python", + "install", + "3.8", + "3.9", + "3.10", + "3.11", + "3.12", + "3.13", + "3.14", + ], + conditions=(_UV_CONDITION,), + timeout=600, + ), + ] + ), } @@ -405,6 +447,6 @@ def main(): runner = px.CliRunner( strategy=px.Strategy.SEQUENTIAL, description="PyMake - Python 构建工具 (替代 Makefile)", - graphs=**_build_graphs(), + **_build_graphs(), ) runner.run_cli() diff --git a/src/pyflowx/cli/runner.py b/src/pyflowx/cli/runner.py new file mode 100644 index 0000000..89492c5 --- /dev/null +++ b/src/pyflowx/cli/runner.py @@ -0,0 +1,300 @@ +"""命令行运行器:根据用户输入执行对应的任务流图. + +参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例, +通过 argparse 解析用户输入的命令并执行对应的图. + +与 bitool_skill.MapSkill 的区别: +- MapSkill 通过继承 + create_scheduler_map 构建命令映射 +- CliRunner 通过关键字参数直接注入命令到图的映射, 更声明式 +- CliRunner 复用 pyflowx 的 DAG 调度能力 (run/Graph/TaskSpec) + +verbose 模式 +------------ +``CliRunner`` 默认 ``verbose=True``, 会: +1. 打印任务生命周期 (开始/成功/失败/跳过) 到 stdout +2. 对 ``cmd`` 类任务, 显示执行的命令及其标准输出/标准错误 + +可通过构造参数 ``verbose=False`` 或命令行 ``--quiet`` 关闭. +""" + +from __future__ import annotations + +import argparse +import dataclasses +import enum +import sys +from typing import Dict, List, Optional, Sequence, Union + +from ..errors import PyFlowXError +from ..executors import Strategy, _normalize_strategy, run +from ..graph import Graph + +__all__ = ["CliRunner", "CliExitCode"] + + +class CliExitCode(enum.IntEnum): + """CliRunner 退出码.""" + + SUCCESS = 0 + FAILURE = 1 + INTERRUPTED = 130 # 与 POSIX 信号中断一致 + + +def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph: + """创建新图, 其中所有 TaskSpec 的 verbose 字段被设置为指定值. + + 使用 ``dataclasses.replace`` 在不可变的 TaskSpec 上创建带 verbose 标记的副本. + 依赖关系、标签等元数据全部保留. + + Parameters + ---------- + graph : Graph + 原始图. + verbose : bool + 要设置的 verbose 值. + + Returns + ------- + Graph + 所有 spec 的 verbose 字段已更新的新图. + """ + new_specs = [] + for spec in graph.all_specs().values(): + if spec.verbose == verbose: + new_specs.append(spec) + else: + new_specs.append(dataclasses.replace(spec, verbose=verbose)) + return Graph.from_specs(new_specs) + + +class CliRunner: + """命令行运行器: 根据用户输入执行对应的任务流图. + + 参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例. + 通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图. + + Parameters + ---------- + strategy : str | Strategy + 默认执行策略 (``Strategy.SEQUENTIAL`` / ``Strategy.THREAD`` / + ``Strategy.ASYNC`` 或对应字符串). 可被命令行 ``--strategy`` 覆盖. + description : str + CLI 描述文本, 显示在 ``--help`` 中. + verbose : bool + 是否显示详细执行过程. ``True`` 时打印任务生命周期和 subprocess 输出. + 默认 ``True``. 可被命令行 ``--quiet`` 关闭. + **graphs : Graph + 命令名到图的映射. 每个 key 是一个命令名, value 是对应的 + :class:`~pyflowx.graph.Graph`. + + Examples + -------- + 基本用法:: + + runner = px.CliRunner( + clean=px.Graph.from_specs([ + px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]), + ]), + build=px.Graph.from_specs([ + px.TaskSpec("uv_build", cmd=["uv", "build"]), + ]), + ) + runner.run() # 解析 sys.argv + + 指定策略与描述:: + + runner = px.CliRunner( + strategy=px.Strategy.THREAD, + description="My build tool", + test=px.Graph.from_specs([...]), + ) + runner.run(["test", "--strategy", "sequential"]) + """ + + def __init__( + self, + *, + strategy: Union[str, Strategy] = Strategy.SEQUENTIAL, + description: str = "", + verbose: bool = True, + **graphs: Graph, + ) -> None: + if not graphs: + raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") + # 校验所有值都是 Graph + for name, graph in graphs.items(): + if not isinstance(graph, Graph): + raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}") + self._graphs: Dict[str, Graph] = dict(graphs) + self._strategy: Strategy = _normalize_strategy(strategy) + self._description: str = description + self._verbose: bool = verbose + + # ------------------------------------------------------------------ # + # 内省 + # ------------------------------------------------------------------ # + @property + def commands(self) -> List[str]: + """可用的命令列表 (按插入顺序).""" + return list(self._graphs.keys()) + + @property + def graphs(self) -> Dict[str, Graph]: + """命令名到图的映射 (只读副本).""" + return dict(self._graphs) + + @property + def strategy(self) -> Strategy: + """默认执行策略.""" + return self._strategy + + @property + def description(self) -> str: + """CLI 描述文本.""" + return self._description + + @property + def verbose(self) -> bool: + """是否显示详细执行过程.""" + return self._verbose + + # ------------------------------------------------------------------ # + # 参数解析 + # ------------------------------------------------------------------ # + def _prog_name(self) -> str: + """从 sys.argv[0] 推导程序名.""" + import os + + return os.path.basename(sys.argv[0]) if sys.argv else "pyflowx" + + def create_parser(self) -> argparse.ArgumentParser: + """创建参数解析器. + + 子类可覆盖此方法以添加自定义参数. 覆盖时应保留 ``command`` + 位置参数与 ``--strategy`` / ``--dry-run`` / ``--list`` / ``--quiet`` + 选项, 否则 :meth:`run` 的默认逻辑可能失效. + + Returns + ------- + argparse.ArgumentParser + 新创建的参数解析器实例. + """ + parser = argparse.ArgumentParser( + prog=self._prog_name(), + description=self._description or "PyFlowX CLI Runner", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=self._format_commands_help(), + ) + parser.add_argument( + "command", + nargs="?", + help="要执行的命令", + ) + parser.add_argument( + "--strategy", + choices=[s.value for s in Strategy], + default=self._strategy.value, + help="执行策略 (默认: %(default)s)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="只打印执行计划, 不实际运行", + ) + parser.add_argument( + "--list", + action="store_true", + help="列出所有可用命令", + ) + parser.add_argument( + "--quiet", + action="store_true", + help="静默模式, 不显示执行过程 (覆盖默认 verbose)", + ) + return parser + + def _format_commands_help(self) -> str: + """格式化命令帮助文本.""" + lines = ["可用命令:"] + for cmd in self._graphs: + lines.append(f" {cmd}") + return "\n".join(lines) + + # ------------------------------------------------------------------ # + # 执行 + # ------------------------------------------------------------------ # + def run(self, args: Optional[Sequence[str]] = None) -> int: + """解析参数并执行对应的图. + + Parameters + ---------- + args : Sequence[str] | None + 参数列表, 默认使用 ``sys.argv[1:]``. + + Returns + ------- + int + 退出码 (0 成功, 1 失败, 130 中断). + + Raises + ------ + SystemExit + 当 argparse 无法解析参数时 (与标准 argparse 行为一致). + """ + parser = self.create_parser() + parsed = parser.parse_args(args) + + # --list: 列出命令 + if parsed.list: + print(self._format_commands_help()) + return CliExitCode.SUCCESS.value + + # 无命令: 显示帮助 + if not parsed.command: + parser.print_help() + return CliExitCode.FAILURE.value + + # 验证命令 + if parsed.command not in self._graphs: + available = ", ".join(self._graphs.keys()) + print( + f"错误: 未知命令 {parsed.command!r} (可用命令: {available})", + file=sys.stderr, + ) + return CliExitCode.FAILURE.value + + # 确定是否 verbose: --quiet 覆盖默认值 + verbose = self._verbose and not parsed.quiet + + # 对图应用 verbose 设置 (重建带 verbose 标记的 spec) + graph = self._graphs[parsed.command] + if verbose: + graph = _apply_verbose_to_graph(graph, verbose=True) + + # 执行对应的图 + try: + report = run( + graph, + strategy=parsed.strategy, + dry_run=parsed.dry_run, + verbose=verbose, + ) + return CliExitCode.SUCCESS.value if report.success else CliExitCode.FAILURE.value + except KeyboardInterrupt: + print("\n操作已取消", file=sys.stderr) + return CliExitCode.INTERRUPTED.value + except PyFlowXError as e: + print(f"错误: {e}", file=sys.stderr) + return CliExitCode.FAILURE.value + + def run_cli(self, args: Optional[Sequence[str]] = None) -> None: + """运行并以退出码退出进程. + + 作为 CLI 工具运行时的入口点, 等价于 ``sys.exit(self.run(args))``. + + Parameters + ---------- + args : Sequence[str] | None + 参数列表, 默认使用 ``sys.argv[1:]``. + """ + sys.exit(self.run(args)) diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index ff08782..c70bd9e 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -16,10 +16,11 @@ from __future__ import annotations import asyncio import concurrent.futures +import enum import inspect import logging from datetime import datetime -from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, cast +from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Union, cast from .context import build_call_args, describe_injection from .errors import TaskFailedError, TaskTimeoutError @@ -33,8 +34,53 @@ logger = logging.getLogger("pyflowx") # 观察者回调类型。 EventCallback = Callable[[TaskEvent], None] -# 策略选择字面量。 -Strategy = str # "sequential" | "thread" | "async" + +class Strategy(enum.Enum): + """任务图执行策略. + + Members + ------- + SEQUENTIAL + 顺序执行: 逐个运行任务, 确定性最高, 适合调试. + THREAD + 线程池执行: 层内任务通过线程池并发, 适合 I/O 密集型同步任务. + ASYNC + 异步执行: 通过 ``asyncio.gather`` 实现层内并发, 适合 I/O 密集型异步任务. + """ + + SEQUENTIAL = "sequential" + THREAD = "thread" + ASYNC = "async" + + +def _normalize_strategy(strategy: Union[str, Strategy]) -> Strategy: + """将字符串或 Strategy 归一化为 Strategy 枚举. + + Parameters + ---------- + strategy : str | Strategy + 策略值, 接受字符串 (``"sequential"`` / ``"thread"`` / ``"async"``) + 或 :class:`Strategy` 枚举成员. + + Returns + ------- + Strategy + 归一化后的枚举成员. + + Raises + ------ + ValueError + 策略不被识别时. + """ + if isinstance(strategy, Strategy): + return strategy + if isinstance(strategy, str): + try: + return Strategy(strategy) + except ValueError: + valid = ", ".join(repr(s.value) for s in Strategy) + raise ValueError(f"unknown strategy {strategy!r}; expected one of {valid}.") from None + raise TypeError(f"strategy must be str or Strategy, got {type(strategy).__name__}") def _is_async_fn(spec: TaskSpec[object]) -> bool: @@ -299,12 +345,51 @@ async def _execute_layer_async( # ---------------------------------------------------------------------- # # 公共 API # ---------------------------------------------------------------------- # +def _make_verbose_callback( + on_event: Optional[EventCallback], +) -> Optional[EventCallback]: + """包装 on_event 回调, 在 verbose 模式下打印任务生命周期. + + Parameters + ---------- + on_event : EventCallback | None + 用户提供的原始回调, 若为 None 则仅打印. + + Returns + ------- + EventCallback | None + 包装后的回调. + """ + + def _verbose_callback(event: TaskEvent) -> None: + # 先打印生命周期信息 + dur = f" ({event.duration:.3f}s)" if event.duration is not None else "" + if event.status == TaskStatus.RUNNING: + print(f"[verbose] 任务 {event.task!r} 开始执行...", flush=True) + elif event.status == TaskStatus.SUCCESS: + print(f"[verbose] 任务 {event.task!r} 成功{dur}", flush=True) + elif event.status == TaskStatus.FAILED: + err = f": {event.error}" if event.error else "" + print( + f"[verbose] 任务 {event.task!r} 失败{dur} (尝试 {event.attempts} 次){err}", + flush=True, + ) + elif event.status == TaskStatus.SKIPPED: + print(f"[verbose] 任务 {event.task!r} 跳过", flush=True) + # 再调用用户回调 + if on_event is not None: + on_event(event) + + return _verbose_callback + + def run( graph: Graph, - strategy: Strategy = "sequential", + strategy: Union[str, Strategy] = Strategy.SEQUENTIAL, *, max_workers: Optional[int] = None, dry_run: bool = False, + verbose: bool = False, on_event: Optional[EventCallback] = None, state: Optional[StateBackend] = None, ) -> RunReport: @@ -315,12 +400,16 @@ def run( graph: 待执行的已校验 :class:`Graph`。 strategy: - ``"sequential"``(默认)、``"thread"`` 或 ``"async"``。 + 执行策略, 接受 :class:`Strategy` 枚举成员或字符串 + (``"sequential"`` / ``"thread"`` / ``"async"``). 默认 ``Strategy.SEQUENTIAL``. max_workers: ``"thread"`` 的线程池大小。默认 ``min(32, len(layer))``。 dry_run: 若为 ``True``,打印执行计划(层 + 注入)并返回空报告,不执行 任何任务。 + verbose: + 若为 ``True``, 打印任务生命周期 (开始/成功/失败/跳过) 到 stdout. + 注意: subprocess 命令的输出由 :class:`TaskSpec` 的 ``verbose`` 字段控制. on_event: 可选回调,在每次状态转换时调用。 state: @@ -335,8 +424,7 @@ def run( 任何任务耗尽重试后仍失败时。运行在失败层中止;后续层的任务 不会被执行。 """ - if strategy not in ("sequential", "thread", "async"): - raise ValueError(f"unknown strategy {strategy!r}; expected 'sequential', 'thread', or 'async'.") + normalized = _normalize_strategy(strategy) graph.validate() layers = graph.layers() @@ -345,17 +433,20 @@ def run( _print_dry_run(graph, layers) return RunReport(success=True) + # verbose 模式下包装事件回调 + effective_callback: Optional[EventCallback] = _make_verbose_callback(on_event) if verbose else on_event + backend = resolve_backend(state) report = RunReport() context: Dict[str, Any] = {} try: - if strategy == "sequential": - _drive_sequential(graph, layers, context, report, backend, on_event) - elif strategy == "thread": - _drive_threaded(graph, layers, context, report, backend, on_event, max_workers) + if normalized == Strategy.SEQUENTIAL: + _drive_sequential(graph, layers, context, report, backend, effective_callback) + elif normalized == Strategy.THREAD: + _drive_threaded(graph, layers, context, report, backend, effective_callback, max_workers) else: - _drive_async(graph, layers, context, report, backend, on_event) + _drive_async(graph, layers, context, report, backend, effective_callback) except TaskFailedError: report.success = False raise diff --git a/src/pyflowx/task.py b/src/pyflowx/task.py index fe646db..6e2fed9 100644 --- a/src/pyflowx/task.py +++ b/src/pyflowx/task.py @@ -107,6 +107,10 @@ class TaskSpec(Generic[T]): cwd: 命令执行的工作目录,仅在使用 ``cmd`` 参数时有效。 ``None`` 表示当前目录。 + verbose: + 是否在命令执行时显示详细输出。``True`` 时会打印执行的命令 + 及其标准输出/标准错误。仅在使用 ``cmd`` 参数时有效。 + ``False`` 时静默捕获输出(失败时仍会包含在错误信息中)。 """ name: str @@ -120,6 +124,7 @@ class TaskSpec(Generic[T]): tags: Tuple[str, ...] = () conditions: Tuple[Condition, ...] = () cwd: Optional[Path] = None + verbose: bool = False def __post_init__(self) -> None: if not self.name: @@ -157,6 +162,7 @@ class TaskSpec(Generic[T]): cmd = self.cmd cwd = self.cwd timeout = self.timeout + verbose = self.verbose if isinstance(cmd, list): @@ -164,12 +170,16 @@ class TaskSpec(Generic[T]): import subprocess cmd_str = " ".join(str(arg) for arg in cmd) + if verbose: + print(f"[verbose] 执行命令: {cmd_str}", flush=True) + if cwd is not None: + print(f"[verbose] 工作目录: {cwd}", flush=True) try: result = subprocess.run( cmd, cwd=cwd, timeout=timeout, - capture_output=True, + capture_output=not verbose, text=True, check=False, ) @@ -180,11 +190,14 @@ class TaskSpec(Generic[T]): except OSError as e: raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") + if verbose: + print(f"[verbose] 返回码: {result.returncode}", flush=True) + if result.returncode == 0: return None # type: ignore[return-value] err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}" - if result.stderr.strip(): + if not verbose and result.stderr.strip(): err_msg += f"\n{result.stderr.strip()}" raise RuntimeError(err_msg) @@ -196,13 +209,17 @@ class TaskSpec(Generic[T]): def _run_shell() -> T: import subprocess + if verbose: + print(f"[verbose] 执行 Shell: {cmd}", flush=True) + if cwd is not None: + print(f"[verbose] 工作目录: {cwd}", flush=True) try: result = subprocess.run( cmd, shell=True, cwd=cwd, timeout=timeout, - capture_output=True, + capture_output=not verbose, text=True, check=False, ) @@ -213,11 +230,14 @@ class TaskSpec(Generic[T]): except OSError as e: raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") + if verbose: + print(f"[verbose] 返回码: {result.returncode}", flush=True) + if result.returncode == 0: return None # type: ignore[return-value] err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}" - if result.stderr.strip(): + if not verbose and result.stderr.strip(): err_msg += f"\n{result.stderr.strip()}" raise RuntimeError(err_msg) @@ -227,7 +247,9 @@ class TaskSpec(Generic[T]): if callable(cmd): return cmd # type: ignore[return-value] - raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") + raise TypeError( + f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}" + ) def should_execute(self) -> bool: """检查任务是否应该执行. diff --git a/tests/test_context.py b/tests/test_context.py index 4760801..162d6cc 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -15,7 +15,7 @@ def test_inject_by_parameter_name() -> None: def fn(a: int, b: str) -> str: return f"{a}{b}" - spec = px.TaskSpec("c", fn, ("a", "b")) + spec = px.TaskSpec("c", fn, depends_on=("a", "b")) args, kwargs = build_call_args(spec, {"a": 1, "b": "x"}) assert args == () assert kwargs == {"a": 1, "b": "x"} @@ -25,7 +25,7 @@ def test_inject_context_annotation() -> None: def fn(ctx: px.Context) -> int: return len(ctx) - spec = px.TaskSpec("agg", fn, ("a", "b")) + spec = px.TaskSpec("agg", fn, depends_on=("a", "b")) args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99}) # Only the task's own deps are passed. assert kwargs == {"ctx": {"a": 1, "b": 2}} @@ -35,7 +35,7 @@ def test_inject_var_keyword() -> None: def fn(**kwargs: Any) -> int: return sum(kwargs.values()) - spec = px.TaskSpec("agg", fn, ("a", "b")) + spec = px.TaskSpec("agg", fn, depends_on=("a", "b")) args, kwargs = build_call_args(spec, {"a": 1, "b": 2}) assert kwargs == {"a": 1, "b": 2} @@ -54,7 +54,7 @@ def test_default_param_not_required() -> None: def fn(a: int, flag: bool = True) -> int: return a if flag else 0 - spec = px.TaskSpec("t", fn, ("a",)) + spec = px.TaskSpec("t", fn, depends_on=("a",)) args, kwargs = build_call_args(spec, {"a": 5}) assert kwargs == {"a": 5} @@ -63,7 +63,7 @@ def test_unresolved_required_param_raises() -> None: def fn(a: int, missing: str) -> None: return None - spec = px.TaskSpec("t", fn, ("a",)) + spec = px.TaskSpec("t", fn, depends_on=("a",)) with pytest.raises(InjectionError) as exc_info: build_call_args(spec, {"a": 1}) assert "missing" in str(exc_info.value) @@ -73,7 +73,7 @@ def test_static_kwargs_collide_with_dependency() -> None: def fn(a: int) -> int: return a - spec = px.TaskSpec("t", fn, ("a",), kwargs={"a": 99}) + spec = px.TaskSpec("t", fn, depends_on=("a",), kwargs={"a": 99}) with pytest.raises(InjectionError): build_call_args(spec, {"a": 1}) @@ -82,7 +82,7 @@ def test_describe_injection() -> None: def fn(a: int, ctx: px.Context, flag: bool = False) -> None: return None - spec = px.TaskSpec("t", fn, ("a",)) + spec = px.TaskSpec("t", fn, depends_on=("a",)) desc = describe_injection(spec) assert "a=" in desc assert "ctx=" in desc @@ -147,7 +147,7 @@ def test_describe_injection_var_keyword() -> None: def fn(**kwargs: Any) -> None: return None - spec = px.TaskSpec("t", fn, ("a",)) + spec = px.TaskSpec("t", fn, depends_on=("a",)) desc = describe_injection(spec) assert "**kwargs=" in desc @@ -207,7 +207,7 @@ def test_build_call_args_var_keyword_consumes_leftover() -> None: def fn(a: int, **rest: Any) -> int: return a + sum(rest.values()) - spec = px.TaskSpec("t", fn, ("a", "b", "c")) + spec = px.TaskSpec("t", fn, depends_on=("a", "b", "c")) args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 3}) assert kwargs == {"a": 1, "b": 2, "c": 3} @@ -218,7 +218,7 @@ def test_build_call_args_no_var_keyword_drops_leftover() -> None: def fn(a: int) -> int: return a - spec = px.TaskSpec("t", fn, ("a", "b")) + spec = px.TaskSpec("t", fn, depends_on=("a", "b")) # b 是依赖但 fn 不接收它 —— 应正常工作 args, kwargs = build_call_args(spec, {"a": 1, "b": 2}) assert kwargs == {"a": 1} @@ -230,6 +230,6 @@ def test_build_call_args_context_annotation_only_deps() -> None: def fn(ctx: px.Context) -> int: return len(ctx) - spec = px.TaskSpec("t", fn, ("a", "b")) + spec = px.TaskSpec("t", fn, depends_on=("a", "b")) args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99}) assert kwargs == {"ctx": {"a": 1, "b": 2}} diff --git a/tests/test_executors.py b/tests/test_executors.py index aa09fc6..95a88f7 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -29,7 +29,7 @@ def test_sequential_basic() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("extract", extract), - px.TaskSpec("double", double, ("extract",)), + px.TaskSpec("double", double, depends_on=("extract",)), ] ) report = px.run(graph, strategy="sequential") @@ -51,9 +51,9 @@ def test_sequential_diamond() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), ("a",)), - px.TaskSpec("c", make("c"), ("a",)), - px.TaskSpec("d", make("d"), ("b", "c")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + px.TaskSpec("c", make("c"), depends_on=("a",)), + px.TaskSpec("d", make("d"), depends_on=("b", "c")), ] ) report = px.run(graph, strategy="sequential") @@ -72,7 +72,7 @@ def test_failure_propagates() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("boom", boom), - px.TaskSpec("downstream", downstream, ("boom",)), + px.TaskSpec("downstream", downstream, depends_on=("boom",)), ] ) with pytest.raises(TaskFailedError) as exc_info: @@ -147,7 +147,7 @@ def test_threaded_layer_barrier() -> None: [ px.TaskSpec("a", make("a")), px.TaskSpec("b", make("b")), - px.TaskSpec("c", make("c"), ("a", "b")), + px.TaskSpec("c", make("c"), depends_on=("a", "b")), ] ) report = px.run(graph, strategy="thread", max_workers=2) @@ -171,7 +171,7 @@ def test_async_basic() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("fetch", fetch), - px.TaskSpec("transform", transform, ("fetch",)), + px.TaskSpec("transform", transform, depends_on=("fetch",)), ] ) report = px.run(graph, strategy="async") @@ -209,7 +209,7 @@ def test_async_mixed_sync_and_async() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("sync_task", sync_task), - px.TaskSpec("async_task", async_task, ("sync_task",)), + px.TaskSpec("async_task", async_task, depends_on=("sync_task",)), ] ) report = px.run(graph, strategy="async") @@ -262,7 +262,7 @@ def test_memory_backend_resume() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), ("a",)), + px.TaskSpec("b", make("b"), depends_on=("a",)), ] ) backend = MemoryBackend() @@ -402,7 +402,7 @@ def test_threaded_skips_cached_tasks() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), ("a",)), + px.TaskSpec("b", make("b"), depends_on=("a",)), ] ) backend = px.MemoryBackend() @@ -447,7 +447,7 @@ def test_async_skips_cached_tasks() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", a), - px.TaskSpec("b", b, ("a",)), + px.TaskSpec("b", b, depends_on=("a",)), ] ) backend = px.MemoryBackend() diff --git a/tests/test_graph.py b/tests/test_graph.py index ad268c3..6b2e3b6 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -16,8 +16,8 @@ def test_from_specs_builds_graph() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn), - px.TaskSpec("b", _fn, ("a",)), - px.TaskSpec("c", _fn, ("a", "b")), + px.TaskSpec("b", _fn, depends_on=("a",)), + px.TaskSpec("c", _fn, depends_on=("a", "b")), ] ) assert set(graph.names) == {"a", "b", "c"} @@ -30,7 +30,7 @@ def test_from_specs_allows_forward_references() -> None: # b depends on a, but a is declared after b — order should not matter. graph = px.Graph.from_specs( [ - px.TaskSpec("b", _fn, ("a",)), + px.TaskSpec("b", _fn, depends_on=("a",)), px.TaskSpec("a", _fn), ] ) @@ -49,7 +49,7 @@ def test_duplicate_task_raises() -> None: def test_missing_dependency_raises() -> None: with pytest.raises(MissingDependencyError) as exc_info: - px.Graph.from_specs([px.TaskSpec("b", _fn, ("a",))]) + px.Graph.from_specs([px.TaskSpec("b", _fn, depends_on=("a",))]) assert exc_info.value.task == "b" assert exc_info.value.dependency == "a" @@ -58,9 +58,9 @@ def test_cycle_detection() -> None: with pytest.raises(CycleError): px.Graph.from_specs( [ - px.TaskSpec("a", _fn, ("c",)), - px.TaskSpec("b", _fn, ("a",)), - px.TaskSpec("c", _fn, ("b",)), + px.TaskSpec("a", _fn, depends_on=("c",)), + px.TaskSpec("b", _fn, depends_on=("a",)), + px.TaskSpec("c", _fn, depends_on=("b",)), ] ) @@ -70,8 +70,8 @@ def test_layers_grouping() -> None: [ px.TaskSpec("a", _fn), px.TaskSpec("b", _fn), - px.TaskSpec("c", _fn, ("a", "b")), - px.TaskSpec("d", _fn, ("c",)), + px.TaskSpec("c", _fn, depends_on=("a", "b")), + px.TaskSpec("d", _fn, depends_on=("c",)), ] ) layers = graph.layers() @@ -80,14 +80,14 @@ def test_layers_grouping() -> None: def test_self_dependency_rejected() -> None: with pytest.raises(ValueError): - px.TaskSpec("a", _fn, ("a",)) + px.TaskSpec("a", _fn, depends_on=("a",)) def test_to_mermaid() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn), - px.TaskSpec("b", _fn, ("a",)), + px.TaskSpec("b", _fn, depends_on=("a",)), ] ) mermaid = graph.to_mermaid() @@ -106,8 +106,8 @@ def test_subgraph_by_tags() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn, tags=("ingest",)), - px.TaskSpec("b", _fn, ("a",), tags=("ingest",)), - px.TaskSpec("c", _fn, ("b",), tags=("report",)), + px.TaskSpec("b", _fn, depends_on=("a",), tags=("ingest",)), + px.TaskSpec("c", _fn, depends_on=("b",), tags=("report",)), ] ) sub = graph.subgraph(["ingest"]) @@ -121,8 +121,8 @@ def test_subgraph_by_names() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn), - px.TaskSpec("b", _fn, ("a",)), - px.TaskSpec("c", _fn, ("b",)), + px.TaskSpec("b", _fn, depends_on=("a",)), + px.TaskSpec("c", _fn, depends_on=("b",)), ] ) sub = graph.subgraph_by_names(["a", "b"]) @@ -141,7 +141,7 @@ def test_describe() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn), - px.TaskSpec("b", _fn, ("a",)), + px.TaskSpec("b", _fn, depends_on=("a",)), ] ) desc = graph.describe() @@ -160,7 +160,7 @@ def test_add_chains_and_validates() -> None: assert "a" in graph # 缺失依赖应即时报错 with pytest.raises(MissingDependencyError): - graph.add(px.TaskSpec("b", _fn, ("missing",))) + graph.add(px.TaskSpec("b", _fn, depends_on=("missing",))) def test_add_duplicate_raises() -> None: @@ -189,7 +189,7 @@ def test_dependencies_accessor() -> None: graph = px.Graph.from_specs( [ px.TaskSpec("a", _fn), - px.TaskSpec("b", _fn, ("a",)), + px.TaskSpec("b", _fn, depends_on=("a",)), ] ) assert graph.dependencies("a") == () diff --git a/tests/test_runner.py b/tests/test_runner.py index 2a35990..c6fa953 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -9,7 +9,7 @@ from unittest.mock import patch import pytest import pyflowx as px -from pyflowx import CliExitCode, CliRunner +from pyflowx import CliExitCode, CliRunner, Strategy from pyflowx.errors import TaskFailedError # 跨平台的 echo 命令 @@ -29,20 +29,24 @@ def _echo_graph(name: str = "echo_task", msg: str = "hello") -> px.Graph: def _failing_graph() -> px.Graph: """构造一个必定失败的单任务图.""" - return px.Graph.from_specs([ - px.TaskSpec( - "fail", - cmd=["python", "-c", "import sys; sys.exit(1)"], - ) - ]) + return px.Graph.from_specs( + [ + px.TaskSpec( + "fail", + cmd=["python", "-c", "import sys; sys.exit(1)"], + ) + ] + ) def _multi_task_graph() -> px.Graph: """构造一个带依赖的多任务图.""" - return px.Graph.from_specs([ - px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]), - px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)), - ]) + return px.Graph.from_specs( + [ + px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]), + px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)), + ] + ) # ---------------------------------------------------------------------- # @@ -81,14 +85,39 @@ class TestCliRunnerConstruction: px.CliRunner(build=[1, 2, 3]) # type: ignore[arg-type] def test_default_strategy_is_sequential(self) -> None: - """默认策略应为 sequential.""" + """默认策略应为 Strategy.SEQUENTIAL.""" runner = px.CliRunner(clean=_echo_graph()) - assert runner.strategy == "sequential" + assert runner.strategy == Strategy.SEQUENTIAL - def test_custom_strategy(self) -> None: - """应支持自定义策略.""" + def test_custom_strategy_string(self) -> None: + """应支持通过字符串指定策略.""" runner = px.CliRunner(strategy="thread", clean=_echo_graph()) - assert runner.strategy == "thread" + assert runner.strategy == Strategy.THREAD + + def test_custom_strategy_enum(self) -> None: + """应支持通过 Strategy 枚举指定策略.""" + runner = px.CliRunner(strategy=Strategy.ASYNC, clean=_echo_graph()) + assert runner.strategy == Strategy.ASYNC + + def test_invalid_strategy_raises(self) -> None: + """非法策略字符串应抛出 ValueError.""" + with pytest.raises(ValueError, match="unknown strategy"): + px.CliRunner(strategy="invalid", clean=_echo_graph()) + + def test_invalid_strategy_type_raises(self) -> None: + """非法策略类型应抛出 TypeError.""" + with pytest.raises(TypeError, match="strategy must be"): + px.CliRunner(strategy=123, clean=_echo_graph()) # type: ignore[arg-type] + + def test_default_verbose_is_true(self) -> None: + """默认 verbose 应为 True.""" + runner = px.CliRunner(clean=_echo_graph()) + assert runner.verbose is True + + def test_custom_verbose_false(self) -> None: + """应支持关闭 verbose.""" + runner = px.CliRunner(verbose=False, clean=_echo_graph()) + assert runner.verbose is False def test_default_description_is_empty(self) -> None: """默认描述应为空字符串.""" @@ -196,6 +225,20 @@ class TestCliRunnerParser: parsed = parser.parse_args(["--list"]) assert parsed.list is True + def test_parser_has_quiet_flag(self) -> None: + """解析器应有 --quiet 标志.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean", "--quiet"]) + assert parsed.quiet is True + + def test_parser_quiet_default_false(self) -> None: + """--quiet 默认为 False.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean"]) + assert parsed.quiet is False + def test_format_commands_help_contains_all_commands(self) -> None: """帮助文本应包含所有命令.""" runner = px.CliRunner( @@ -214,13 +257,17 @@ class TestCliRunnerParser: class TestCliRunnerRunSuccess: """测试 CliRunner.run 的成功执行路径.""" - def test_run_valid_command_returns_zero(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_valid_command_returns_zero( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """有效命令执行成功应返回 0.""" runner = px.CliRunner(echo=_echo_graph()) exit_code = runner.run(["echo"]) assert exit_code == CliExitCode.SUCCESS.value - def test_run_executes_correct_graph(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_executes_correct_graph( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """应执行用户指定的命令对应的图.""" executed: List[str] = [] @@ -258,13 +305,101 @@ class TestCliRunnerRunSuccess: assert "Dry run" in captured.out +# ---------------------------------------------------------------------- # +# 执行: verbose 模式 +# ---------------------------------------------------------------------- # +class TestCliRunnerVerbose: + """测试 verbose 模式.""" + + def test_verbose_default_prints_lifecycle( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """默认 verbose=True 应打印任务生命周期.""" + runner = px.CliRunner(echo=_echo_graph()) + runner.run(["echo"]) + captured = capsys.readouterr() + # verbose 模式下应打印任务生命周期 + assert "[verbose]" in captured.out + + def test_quiet_flag_disables_verbose( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """--quiet 应关闭 verbose 输出.""" + runner = px.CliRunner(echo=_echo_graph()) + runner.run(["echo", "--quiet"]) + captured = capsys.readouterr() + # quiet 模式下不应有 [verbose] 前缀的输出 + assert "[verbose]" not in captured.out + + def test_verbose_false_constructor_disables_verbose( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """构造时 verbose=False 应关闭 verbose 输出.""" + runner = px.CliRunner(verbose=False, echo=_echo_graph()) + runner.run(["echo"]) + captured = capsys.readouterr() + assert "[verbose]" not in captured.out + + def test_verbose_prints_command_for_cmd_task( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """verbose 模式下 cmd 任务应打印执行的命令.""" + runner = px.CliRunner(echo=_echo_graph(msg="verbose-test")) + runner.run(["echo"]) + captured = capsys.readouterr() + # 应打印执行的命令 + assert "执行命令" in captured.out or "执行 Shell" in captured.out + # 应打印返回码 + assert "返回码" in captured.out + + def test_verbose_prints_success_lifecycle( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """verbose 模式下成功任务应打印成功信息.""" + runner = px.CliRunner(echo=_echo_graph()) + runner.run(["echo"]) + captured = capsys.readouterr() + assert "成功" in captured.out + + def test_verbose_prints_skip_lifecycle( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """verbose 模式下跳过的任务应打印跳过信息.""" + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "skip_me", + cmd=[*ECHO_CMD, "skip"], + conditions=(lambda: False,), + ), + ] + ) + runner = px.CliRunner(skip=graph) + runner.run(["skip"]) + captured = capsys.readouterr() + assert "跳过" in captured.out + + def test_verbose_prints_failure_lifecycle( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """verbose 模式下失败任务应打印失败信息.""" + runner = px.CliRunner(fail=_failing_graph()) + runner.run(["fail"]) + captured = capsys.readouterr() + # 失败信息可能出现在 stdout (verbose) 或 stderr (PyFlowXError) + combined = captured.out + captured.err + assert "失败" in combined or "错误" in combined + + # ---------------------------------------------------------------------- # # 执行: 失败路径 # ---------------------------------------------------------------------- # class TestCliRunnerRunFailure: """测试 CliRunner.run 的失败执行路径.""" - def test_run_unknown_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_unknown_command_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """未知命令应返回 1 并打印错误.""" runner = px.CliRunner(clean=_echo_graph()) exit_code = runner.run(["unknown"]) @@ -273,7 +408,9 @@ class TestCliRunnerRunFailure: assert "未知命令" in captured.err assert "clean" in captured.err - def test_run_no_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_no_command_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """无命令时应返回 1 并打印帮助.""" runner = px.CliRunner(clean=_echo_graph()) exit_code = runner.run([]) @@ -281,13 +418,17 @@ class TestCliRunnerRunFailure: captured = capsys.readouterr() assert "可用命令" in captured.out or "可用命令" in captured.err - def test_run_failing_task_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_failing_task_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """任务失败时应返回 1.""" runner = px.CliRunner(fail=_failing_graph()) exit_code = runner.run(["fail"]) assert exit_code == CliExitCode.FAILURE.value - def test_run_failing_task_prints_error(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_run_failing_task_prints_error( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """任务失败时应打印错误信息.""" runner = px.CliRunner(fail=_failing_graph()) runner.run(["fail"]) @@ -321,7 +462,9 @@ class TestCliRunnerList: assert "build" in captured.out assert "test" in captured.out - def test_list_does_not_execute_any_graph(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_list_does_not_execute_any_graph( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """--list 不应执行任何图.""" executed: List[str] = [] @@ -339,27 +482,31 @@ class TestCliRunnerList: class TestCliRunnerErrorHandling: """测试错误处理.""" - def test_keyboard_interrupt_returns_130(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_keyboard_interrupt_returns_130( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """KeyboardInterrupt 应返回 130.""" runner = px.CliRunner(echo=_echo_graph()) def raise_interrupt(*args: Any, **kwargs: Any) -> None: raise KeyboardInterrupt - with patch("pyflowx.runner.run", side_effect=raise_interrupt): + with patch("pyflowx.cli.runner.run", side_effect=raise_interrupt): exit_code = runner.run(["echo"]) assert exit_code == CliExitCode.INTERRUPTED.value captured = capsys.readouterr() assert "取消" in captured.err - def test_pyflowx_error_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_pyflowx_error_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: """PyFlowXError 应返回 1.""" runner = px.CliRunner(echo=_echo_graph()) def raise_error(*args: Any, **kwargs: Any) -> None: raise TaskFailedError("echo", RuntimeError("boom"), 1) - with patch("pyflowx.runner.run", side_effect=raise_error): + with patch("pyflowx.cli.runner.run", side_effect=raise_error): exit_code = runner.run(["echo"]) assert exit_code == CliExitCode.FAILURE.value captured = capsys.readouterr() @@ -376,7 +523,7 @@ class TestCliRunnerErrorHandling: def raise_custom(*args: Any, **kwargs: Any) -> None: raise CustomError("unexpected") - with patch("pyflowx.runner.run", side_effect=raise_custom): + with patch("pyflowx.cli.runner.run", side_effect=raise_custom): with pytest.raises(CustomError): runner.run(["echo"]) @@ -401,7 +548,9 @@ class TestCliRunnerRunCli: runner.run_cli(["fail"]) assert exc_info.value.code == CliExitCode.FAILURE.value - def test_run_cli_no_args_uses_sys_argv(self, monkeypatch: pytest.MonkeyPatch) -> None: + def test_run_cli_no_args_uses_sys_argv( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: """run_cli 无参数时应使用 sys.argv.""" monkeypatch.setattr(sys, "argv", ["pymake", "echo"]) runner = px.CliRunner(echo=_echo_graph()) @@ -438,26 +587,30 @@ class TestCliRunnerIntegration: def test_condition_skipped_command_succeeds(self) -> None: """条件不满足时任务跳过, 整体仍成功.""" - graph = px.Graph.from_specs([ - px.TaskSpec( - "skip_me", - cmd=[*ECHO_CMD, "should not run"], - conditions=(lambda: False,), - ), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "skip_me", + cmd=[*ECHO_CMD, "should not run"], + conditions=(lambda: False,), + ), + ] + ) runner = px.CliRunner(skip=graph) exit_code = runner.run(["skip"]) assert exit_code == CliExitCode.SUCCESS.value def test_condition_met_command_succeeds(self) -> None: """条件满足时任务执行, 整体成功.""" - graph = px.Graph.from_specs([ - px.TaskSpec( - "run_me", - cmd=[*ECHO_CMD, "should run"], - conditions=(lambda: True,), - ), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "run_me", + cmd=[*ECHO_CMD, "should run"], + conditions=(lambda: True,), + ), + ] + ) runner = px.CliRunner(run=graph) exit_code = runner.run(["run"]) assert exit_code == CliExitCode.SUCCESS.value @@ -473,12 +626,14 @@ class TestCliRunnerIntegration: return fn - graph = px.Graph.from_specs([ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - px.TaskSpec("c", make("c"), depends_on=("a",)), - px.TaskSpec("d", make("d"), depends_on=("b", "c")), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + px.TaskSpec("c", make("c"), depends_on=("a",)), + px.TaskSpec("d", make("d"), depends_on=("b", "c")), + ] + ) runner = px.CliRunner(diamond=graph) exit_code = runner.run(["diamond"]) assert exit_code == CliExitCode.SUCCESS.value @@ -488,7 +643,9 @@ class TestCliRunnerIntegration: """混合 fn 和 cmd 的命令应都能执行.""" runner = px.CliRunner( fn_cmd=px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]), - cmd_cmd=px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]), + cmd_cmd=px.Graph.from_specs( + [px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])] + ), ) assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value @@ -504,7 +661,9 @@ class TestCliRunnerIntegration: else: ls_cmd = ["ls"] - graph = px.Graph.from_specs([px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))]) + graph = px.Graph.from_specs( + [px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))] + ) runner = px.CliRunner(ls=graph) exit_code = runner.run(["ls"]) assert exit_code == CliExitCode.SUCCESS.value @@ -534,6 +693,22 @@ class TestCliRunnerExport: """CliExitCode 应在 __all__ 中.""" assert "CliExitCode" in px.__all__ + def test_strategy_exported_from_pyflowx(self) -> None: + """Strategy 应从 pyflowx 顶层导出.""" + assert hasattr(px, "Strategy") + assert px.Strategy is Strategy + + def test_strategy_in_all(self) -> None: + """Strategy 应在 __all__ 中.""" + assert "Strategy" in px.__all__ + + def test_strategy_members(self) -> None: + """Strategy 应有 SEQUENTIAL/THREAD/ASYNC 三个成员.""" + assert Strategy.SEQUENTIAL.value == "sequential" + assert Strategy.THREAD.value == "thread" + assert Strategy.ASYNC.value == "async" + assert len(list(Strategy)) == 3 + if __name__ == "__main__": pytest.main([__file__, "-v"])