diff --git a/src/pyflowx/__init__.py b/src/pyflowx/__init__.py index b4aaeda..9bda74d 100644 --- a/src/pyflowx/__init__.py +++ b/src/pyflowx/__init__.py @@ -60,6 +60,7 @@ from .conditions import ( IS_POSIX, IS_WINDOWS, ) +from .cli import CliExitCode, CliRunner from .context import Context, build_call_args, describe_injection from .errors import ( CycleError, @@ -91,6 +92,9 @@ __all__ = [ "RunReport", # 执行 "run", + # CLI 运行器 + "CliRunner", + "CliExitCode", # 状态后端 "StateBackend", "MemoryBackend", diff --git a/src/pyflowx/cli/__init__.py b/src/pyflowx/cli/__init__.py index e69de29..c551d2d 100644 --- a/src/pyflowx/cli/__init__.py +++ b/src/pyflowx/cli/__init__.py @@ -0,0 +1,3 @@ +from .runner import CliExitCode, CliRunner + +__all__ = ["CliRunner", "CliExitCode"] \ No newline at end of file diff --git a/src/pyflowx/cli/pymake.py b/src/pyflowx/cli/pymake.py index 3f58a2f..41e55a4 100644 --- a/src/pyflowx/cli/pymake.py +++ b/src/pyflowx/cli/pymake.py @@ -9,6 +9,7 @@ from __future__ import annotations from pathlib import Path import pyflowx as px +from pyflowx.conditions import BuiltinConditions, Constants class PymakeConfig: @@ -41,13 +42,355 @@ class PymakeConfig: DOC_BUILD_COMMAND: list[str] = ["sphinx-build", "-b", "html", "docs", "docs/_build"] # 清理 - DIRS_TO_IGNORE: list[str] = [".venv"] + DIRS_TO_IGNORE: list[str] = [".venv", ".git", ".tox"] PYTHON_BUILD_DIRS: list[str] = ["dist", "build", "*.egg-info", "src/*.egg-info"] conf = PymakeConfig() +def _get_maturin_build_command() -> list[str]: + """获取 maturin 构建命令(根据平台自动添加参数). + + Returns + ------- + list[str] + 完整的 maturin 构建命令列表. + """ + base_cmd = conf.MATURIN_BUILD_COMMAND.copy() + if Constants.IS_WINDOWS: + base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7) + return base_cmd + + +# 命令条件判断 +_MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL) +_PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest") +_UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL) +_HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch") +_RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff") +_GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git") +_TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox") + + +def _build_graphs() -> dict[str, px.Graph]: + """构建所有命令对应的任务流图. + + 将原本的 CommandScheduler/RunCommand 模式转换为 Graph/TaskSpec 模式, + 每个 Graph 是一个独立的任务流, 由 CliRunner 根据用户输入选择执行. + """ + return { + # === 构建命令 === + # 构建 Python 包 + "b": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_build", + cmd=conf.BUILD_COMMAND, + conditions=(_UV_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 构建 Rust 核心模块 + "bc": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_build", + cmd=_get_maturin_build_command(), + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 构建双包(先 Rust 后 Python) + "ba": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_build", + cmd=_get_maturin_build_command(), + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + px.TaskSpec( + "uv_build", + cmd=conf.BUILD_COMMAND, + conditions=(_UV_CONDITION,), + timeout=conf.TIMEOUT, + depends_on=("maturin_build",), + ), + ] + ), + # === 安装命令(开发模式) === + # 安装 Rust 核心模块 + "ic": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_dev", + cmd=conf.MATURIN_DEV_COMMAND, + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + ] + ), + # 安装 Python 主包 + "ip": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_install", + cmd=["uv", "pip", "install", "-e", "."], + conditions=(_UV_CONDITION,), + ), + ] + ), + # 安装双包(开发模式) + "ia": px.Graph.from_specs( + [ + px.TaskSpec( + "maturin_dev", + cmd=conf.MATURIN_DEV_COMMAND, + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + px.TaskSpec( + "uv_install", + cmd=["uv", "pip", "install", "-e", "."], + conditions=(_UV_CONDITION,), + depends_on=("maturin_dev",), + ), + ] + ), + # === 清理命令 === + # 清理 Python 构建产物 + "cp": px.Graph.from_specs( + [ + px.TaskSpec( + "git_clean_python", + cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], + conditions=(_GIT_CONDITION,), + ), + ] + ), + # 清理 Rust 构建产物 + "cc": px.Graph.from_specs( + [ + px.TaskSpec( + "cargo_clean", + cmd=["cargo", "clean"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + ] + ), + # 清理所有构建产物 + "ca": px.Graph.from_specs( + [ + px.TaskSpec( + "cargo_clean", + cmd=["cargo", "clean"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + ), + px.TaskSpec( + "git_clean", + cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], + conditions=(_GIT_CONDITION,), + ), + ] + ), + # === 开发工具 === + # 运行测试, 跳过 slow, 并行模式 + "t": px.Graph.from_specs( + [ + px.TaskSpec( + "pytest", + cmd=[ + "pytest", + "-m", + "not slow", + "-n", + "8", + "--dist", + "loadfile", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 运行测试, 非并行模式 + "tf": px.Graph.from_specs( + [ + px.TaskSpec( + "pytest", + cmd=[ + "pytest", + "-m", + "not slow", + "--dist", + "loadfile", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式 + "tc": px.Graph.from_specs( + [ + px.TaskSpec( + "pytest_cov", + cmd=[ + "pytest", + "-m", + "not slow", + "--cov", + "-n", + "auto", + "--dist", + "loadfile", + "--tb=short", + "-v", + "--color=yes", + "--durations=10", + ], + conditions=(_PYTEST_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 代码格式化与检查 + "lint": px.Graph.from_specs( + [ + px.TaskSpec( + "ruff_check", + cmd=[ + "ruff", + "check", + "--fix", + "--unsafe-fixes", + ], + conditions=(_RUFF_CONDITION,), + timeout=conf.TIMEOUT, + cwd=Path(conf.PROJECT_ROOT), + ), + ] + ), + # 类型检查 + "typecheck": px.Graph.from_specs( + [ + px.TaskSpec( + "ty_check", + cmd=["ty", "check", "src/bitool"], + conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),), + ), + ] + ), + # 构建文档 + "doc": px.Graph.from_specs( + [ + px.TaskSpec( + "sphinx_build", + cmd=conf.DOC_BUILD_COMMAND, + conditions=( + BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL), + ), + ), + ] + ), + # === 发布命令 === + # 发布 Python 主包到 PyPI + "pb": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_python", + cmd=["hatch", "publish"], + cwd=Path(conf.PROJECT_ROOT), + conditions=(_HATCH_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 发布所有包(先 Rust 后 Python) + "pba": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_rust", + cmd=[ + "twine", + "upload", + "--disable-progress-bar", + conf.CORE_PATTERN, + ], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + px.TaskSpec( + "publish_python", + cmd=["hatch", "publish"], + cwd=Path(conf.PROJECT_ROOT), + conditions=(_HATCH_CONDITION,), + timeout=conf.TIMEOUT, + depends_on=("publish_rust",), + ), + ] + ), + # 发布 Rust 核心模块 (maturin publish) + "pbc": px.Graph.from_specs( + [ + px.TaskSpec( + "publish_rust", + cmd=["maturin", "publish"], + cwd=Path(conf.CORE_DIR), + conditions=(_MATURIN_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # === 多版本测试命令 === + # 运行多版本 Python 测试 (tox) + "tox": px.Graph.from_specs( + [ + px.TaskSpec( + "tox_run", + cmd=["tox", "-p", "auto"], + conditions=(_TOX_CONDITION,), + timeout=conf.TIMEOUT, + ), + ] + ), + # 安装多版本 Python (仅安装不测试) + "tox-install": px.Graph.from_specs( + [ + px.TaskSpec( + "uv_python_install", + cmd=[ + "uv", + "python", + "install", + "3.8", + "3.9", + "3.10", + "3.11", + "3.12", + "3.13", + "3.14", + ], + conditions=(_UV_CONDITION,), + timeout=600, + ), + ] + ), + } + + def main(): """ ╔══════════════════════════════════════════════════════════╗ @@ -81,7 +424,9 @@ def main(): pymake tox-install - 安装所有 Python 版本 (仅安装不测试) 📦 发布命令: - pymake pb - 发布到 PyPI (先 Rust 后 Python) + pymake pb - 发布到 PyPI (hatch publish) + pymake pba - 发布所有包 (先 Rust 后 Python) + pymake pbc - 发布 Rust 核心模块 (maturin publish) 💡 常用工作流: 1. 初始化开发环境: pymake ia @@ -96,349 +441,11 @@ def main(): pymake ia # 安装开发环境 pymake t # 运行测试 pymake tox # 多版本兼容性测试 - pymake lint # 格式化代码 + pymake lint # 格式化代码 pymake ca # 清理所有构建产物 """ - pymake_graph = px.Graph.from_specs( - [ - px.TaskSpec("b", cmd=conf.BUILD_COMMAND), - px.TaskSpec("bc", cmd=conf.MATURIN_BUILD_COMMAND), - px.TaskSpec("ic", cmd=conf.DOC_BUILD_COMMAND), - ] + runner = px.CliRunner( + description="PyMake - Python 构建工具 (替代 Makefile)", + **_build_graphs(), ) - px.run(pymake_graph) - - -# class PyMakeSkill(MapSkill): -# """PyMake 构建技能.""" - -# name: ClassVar[str] = "pymake" -# description: ClassVar[str] = "Bitool PyMake - Python构建工具" - -# @override -# def create_scheduler_map( -# self, -# args: argparse.Namespace, -# ) -> dict[str, CommandScheduler] | None: -# return { -# # === 构建命令 === -# # 构建 Python 包 -# "b": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=conf.BUILD_COMMAND, -# allow_conditions=[_UV_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 构建 Rust 核心模块 -# "bc": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=_get_maturin_build_command(), -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 构建双包(先 Rust 后 Python) -# "ba": CommandScheduler( -# commands=[ -# RunCommand( -# name="maturin_build", -# cmd=_get_maturin_build_command(), -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# RunCommand( -# name="uv_build", -# cmd=conf.BUILD_COMMAND, -# allow_conditions=[_UV_CONDITION], -# timeout=conf.TIMEOUT, -# dependencies=["maturin_build"], -# ), -# ], -# ), -# # === 安装命令(开发模式) === -# # 安装 Rust 核心模块 -# "ic": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=conf.MATURIN_DEV_COMMAND, -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# ), -# ], -# ), -# # 安装 Python 主包 -# "ip": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["uv", "pip", "install", "-e", "."], -# allow_conditions=[_UV_CONDITION], -# success_codes={0, 2}, -# ), -# ], -# ), -# # 安装双包(开发模式) -# "ia": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=conf.MATURIN_DEV_COMMAND, -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# ), -# RunCommand( -# cmd=["uv", "pip", "install", "-e", "."], -# allow_conditions=[_UV_CONDITION], -# success_codes={0, 2}, -# ), -# ], -# ), -# # === 清理命令 === -# # 清理 Python 构建产物 -# "cp": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["rm", "-rf", *conf.PYTHON_BUILD_DIRS], -# allow_conditions=[_GIT_CONDITION], # 使用 git clean 更安全 -# ), -# ], -# ), -# # 清理 Rust 构建产物 -# "cc": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["cargo", "clean"], -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[ -# _MATURIN_CONDITION, -# ], # 有 maturin 说明有 cargo -# ), -# ], -# ), -# # 清理所有构建产物 -# "ca": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["cargo", "clean"], -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# ), -# RunCommand( -# cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], -# allow_conditions=[_GIT_CONDITION], -# ), -# ], -# ), -# # === 开发工具 === -# # 运行测试, 跳过 slow, 并行模式 -# "t": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=[ -# "pytest", -# "-m", -# "not slow", -# "-n", -# "8", -# "--dist", -# "loadfile", -# "--color=yes", -# "--durations=10", -# ], -# allow_conditions=[_PYTEST_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 运行测试, 非并行模式 -# "tf": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=[ -# "pytest", -# "-m", -# "not slow", -# "--dist", -# "loadfile", -# "--color=yes", -# "--durations=10", -# ], -# allow_conditions=[_PYTEST_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式 -# # --dist loadfile: 按文件分发测试, 减少模块导入开销 -# "tc": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=[ -# "pytest", -# "-m", -# "not slow", -# "--cov", -# "-n", -# "auto", -# "--dist", -# "loadfile", -# "--tb=short", -# "-v", -# "--color=yes", -# "--durations=10", -# ], -# allow_conditions=[_PYTEST_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 代码格式化与检查 -# "lint": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=[ -# "ruff", -# "check", -# "--fix", -# "--unsafe-fixes", -# ], -# allow_conditions=[_RUFF_CONDITION], -# timeout=conf.TIMEOUT, -# cwd=Path(conf.PROJECT_ROOT), -# ), -# ], -# ), -# # 类型检查 -# "typecheck": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["ty", "check", "src/bitool"], -# allow_conditions=[BuiltinConditions.HAS_APP_INSTALLED("ty")], -# ), -# ], -# ), -# # 构建文档 -# "doc": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=conf.DOC_BUILD_COMMAND, -# allow_conditions=[ -# BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL), -# ], -# ), -# ], -# ), -# # 发布到 PyPI(先发布 Rust 核心模块,再发布 Python 主包) -# "pb": CommandScheduler( -# commands=[ -# # 发布 Python 主包(在项目根目录执行,依赖 Rust 发布成功) -# RunCommand( -# name="publish-python", -# cmd=["hatch", "publish"], -# cwd=Path(conf.PROJECT_ROOT), -# allow_conditions=[_HATCH_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# "pba": CommandScheduler( -# commands=[ -# # 发布 Rust 核心模块(在 core 目录执行) -# RunCommand( -# name="publish-rust", -# # --disable-progress-bar: 避免 Windows GBK 控制台渲染 rich 进度条 -# # 中的 \u2022 字符导致 UnicodeEncodeError -# cmd=[ -# "twine", -# "upload", -# "--disable-progress-bar", -# conf.CORE_PATTERN, -# ], -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# RunCommand( -# name="publish-python", -# cmd=["hatch", "publish"], -# cwd=Path(conf.PROJECT_ROOT), -# allow_conditions=[_HATCH_CONDITION], -# timeout=conf.TIMEOUT, -# dependencies=["publish-rust"], -# ), -# ], -# ), -# "pbc": CommandScheduler( -# commands=[ -# # 发布 Rust 核心模块(在 core 目录执行) -# RunCommand( -# name="publish-rust", -# cmd=["maturin", "publish"], -# cwd=Path(conf.CORE_DIR), -# allow_conditions=[_MATURIN_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # === 多版本测试命令 === -# # 运行多版本 Python 测试 (tox) -# "tox": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=["tox", "-p", "auto"], -# allow_conditions=[_TOX_CONDITION], -# timeout=conf.TIMEOUT, -# ), -# ], -# ), -# # 安装多版本 Python (仅安装不测试) -# "tox-install": CommandScheduler( -# commands=[ -# RunCommand( -# cmd=[ -# "uv", -# "python", -# "install", -# "3.8", -# "3.9", -# "3.10", -# "3.11", -# "3.12", -# "3.13", -# "3.14", -# ], -# allow_conditions=[_UV_CONDITION], -# timeout=600, -# ), -# ], -# ), -# } - - -# def _get_maturin_build_command() -> list[str]: -# """获取 maturin 构建命令(根据平台自动添加参数). - -# Returns -# ------- -# list[str] -# 完整的 maturin 构建命令列表. -# """ -# base_cmd = conf.MATURIN_BUILD_COMMAND.copy() -# if Constants.IS_WINDOWS: -# base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7) -# return base_cmd - - -# # 命令条件判断 -# _MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL) -# _PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest") -# _UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL) -# _HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch") -# _RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff") -# _GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git") -# _TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox") + runner.run_cli() diff --git a/src/pyflowx/cli/runner.py b/src/pyflowx/cli/runner.py new file mode 100644 index 0000000..139b23f --- /dev/null +++ b/src/pyflowx/cli/runner.py @@ -0,0 +1,241 @@ +"""命令行运行器:根据用户输入执行对应的任务流图. + +参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例, +通过 argparse 解析用户输入的命令并执行对应的图. + +与 bitool_skill.MapSkill 的区别: +- MapSkill 通过继承 + create_scheduler_map 构建命令映射 +- CliRunner 通过关键字参数直接注入命令到图的映射, 更声明式 +- CliRunner 复用 pyflowx 的 DAG 调度能力 (run/Graph/TaskSpec) +""" + +from __future__ import annotations + +import argparse +import enum +import sys +from typing import Dict, List, Optional, Sequence + +from ..errors import PyFlowXError +from ..executors import Strategy, run +from ..graph import Graph + +__all__ = ["CliRunner", "CliExitCode"] + + +class CliExitCode(enum.IntEnum): + """CliRunner 退出码.""" + + SUCCESS = 0 + FAILURE = 1 + INTERRUPTED = 130 # 与 POSIX 信号中断一致 + + +class CliRunner: + """命令行运行器: 根据用户输入执行对应的任务流图. + + 参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例. + 通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图. + + Parameters + ---------- + strategy : str + 默认执行策略 (``"sequential"`` / ``"thread"`` / ``"async"``). + 可被命令行 ``--strategy`` 覆盖. + description : str + CLI 描述文本, 显示在 ``--help`` 中. + **graphs : Graph + 命令名到图的映射. 每个 key 是一个命令名, value 是对应的 + :class:`~pyflowx.graph.Graph`. + + Examples + -------- + 基本用法:: + + runner = px.CliRunner( + clean=px.Graph.from_specs([ + px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]), + ]), + build=px.Graph.from_specs([ + px.TaskSpec("uv_build", cmd=["uv", "build"]), + ]), + ) + runner.run() # 解析 sys.argv + + 指定策略与描述:: + + runner = px.CliRunner( + strategy="thread", + description="My build tool", + test=px.Graph.from_specs([...]), + ) + runner.run(["test", "--strategy", "sequential"]) + """ + + def __init__( + self, + *, + strategy: Strategy = "sequential", + description: str = "", + **graphs: Graph, + ) -> None: + if not graphs: + raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") + # 校验所有值都是 Graph + for name, graph in graphs.items(): + if not isinstance(graph, Graph): + raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}") + self._graphs: Dict[str, Graph] = dict(graphs) + self._strategy: Strategy = strategy + self._description: str = description + + # ------------------------------------------------------------------ # + # 内省 + # ------------------------------------------------------------------ # + @property + def commands(self) -> List[str]: + """可用的命令列表 (按插入顺序).""" + return list(self._graphs.keys()) + + @property + def graphs(self) -> Dict[str, Graph]: + """命令名到图的映射 (只读副本).""" + return dict(self._graphs) + + @property + def strategy(self) -> Strategy: + """默认执行策略.""" + return self._strategy + + @property + def description(self) -> str: + """CLI 描述文本.""" + return self._description + + # ------------------------------------------------------------------ # + # 参数解析 + # ------------------------------------------------------------------ # + def _prog_name(self) -> str: + """从 sys.argv[0] 推导程序名.""" + import os + + return os.path.basename(sys.argv[0]) if sys.argv else "pyflowx" + + def create_parser(self) -> argparse.ArgumentParser: + """创建参数解析器. + + 子类可覆盖此方法以添加自定义参数. 覆盖时应保留 ``command`` + 位置参数与 ``--strategy`` / ``--dry-run`` / ``--list`` 选项, + 否则 :meth:`run` 的默认逻辑可能失效. + + Returns + ------- + argparse.ArgumentParser + 新创建的参数解析器实例. + """ + parser = argparse.ArgumentParser( + prog=self._prog_name(), + description=self._description or "PyFlowX CLI Runner", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=self._format_commands_help(), + ) + parser.add_argument( + "command", + nargs="?", + help="要执行的命令", + ) + parser.add_argument( + "--strategy", + choices=["sequential", "thread", "async"], + default=self._strategy, + help="执行策略 (默认: %(default)s)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="只打印执行计划, 不实际运行", + ) + parser.add_argument( + "--list", + action="store_true", + help="列出所有可用命令", + ) + return parser + + def _format_commands_help(self) -> str: + """格式化命令帮助文本.""" + lines = ["可用命令:"] + for cmd in self._graphs: + lines.append(f" {cmd}") + return "\n".join(lines) + + # ------------------------------------------------------------------ # + # 执行 + # ------------------------------------------------------------------ # + def run(self, args: Optional[Sequence[str]] = None) -> int: + """解析参数并执行对应的图. + + Parameters + ---------- + args : Sequence[str] | None + 参数列表, 默认使用 ``sys.argv[1:]``. + + Returns + ------- + int + 退出码 (0 成功, 1 失败, 130 中断). + + Raises + ------ + SystemExit + 当 argparse 无法解析参数时 (与标准 argparse 行为一致). + """ + parser = self.create_parser() + parsed = parser.parse_args(args) + + # --list: 列出命令 + if parsed.list: + print(self._format_commands_help()) + return CliExitCode.SUCCESS.value + + # 无命令: 显示帮助 + if not parsed.command: + parser.print_help() + return CliExitCode.FAILURE.value + + # 验证命令 + if parsed.command not in self._graphs: + available = ", ".join(self._graphs.keys()) + print( + f"错误: 未知命令 {parsed.command!r} (可用命令: {available})", + file=sys.stderr, + ) + return CliExitCode.FAILURE.value + + # 执行对应的图 + graph = self._graphs[parsed.command] + try: + report = run( + graph, + strategy=parsed.strategy, + dry_run=parsed.dry_run, + ) + return CliExitCode.SUCCESS.value if report.success else CliExitCode.FAILURE.value + except KeyboardInterrupt: + print("\n操作已取消", file=sys.stderr) + return CliExitCode.INTERRUPTED.value + except PyFlowXError as e: + print(f"错误: {e}", file=sys.stderr) + return CliExitCode.FAILURE.value + + def run_cli(self, args: Optional[Sequence[str]] = None) -> None: + """运行并以退出码退出进程. + + 作为 CLI 工具运行时的入口点, 等价于 ``sys.exit(self.run(args))``. + + Parameters + ---------- + args : Sequence[str] | None + 参数列表, 默认使用 ``sys.argv[1:]``. + """ + sys.exit(self.run(args)) diff --git a/tests/test_cli_runner.py b/tests/test_cli_runner.py new file mode 100644 index 0000000..339c545 --- /dev/null +++ b/tests/test_cli_runner.py @@ -0,0 +1,573 @@ +"""Tests for CliRunner: command dispatch, argument parsing, exit codes.""" + +from __future__ import annotations + +import sys +from typing import Any, List +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import CliExitCode, CliRunner +from pyflowx.errors import PyFlowXError, TaskFailedError + +# 跨平台的 echo 命令 +if sys.platform == "win32": + ECHO_CMD = ["cmd", "/c", "echo"] +else: + ECHO_CMD = ["echo"] + + +# ---------------------------------------------------------------------- # +# 辅助工厂 +# ---------------------------------------------------------------------- # +def _echo_graph(name: str = "echo_task", msg: str = "hello") -> px.Graph: + """构造一个单任务 echo 图, 用于执行成功场景.""" + return px.Graph.from_specs([px.TaskSpec(name, cmd=[*ECHO_CMD, msg])]) + + +def _failing_graph() -> px.Graph: + """构造一个必定失败的单任务图.""" + return px.Graph.from_specs( + [ + px.TaskSpec( + "fail", + cmd=["python", "-c", "import sys; sys.exit(1)"], + ) + ] + ) + + +def _multi_task_graph() -> px.Graph: + """构造一个带依赖的多任务图.""" + return px.Graph.from_specs( + [ + px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]), + px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)), + ] + ) + + +# ---------------------------------------------------------------------- # +# 构造与校验 +# ---------------------------------------------------------------------- # +class TestCliRunnerConstruction: + """测试 CliRunner 的构造与参数校验.""" + + def test_requires_at_least_one_command(self) -> None: + """没有命令时应抛出 ValueError.""" + with pytest.raises(ValueError, match="至少需要一个命令"): + px.CliRunner() + + def test_accepts_single_graph(self) -> None: + """单个命令应正常构造.""" + runner = px.CliRunner(clean=_echo_graph()) + assert runner.commands == ["clean"] + + def test_accepts_multiple_graphs(self) -> None: + """多个命令应按插入顺序保留.""" + runner = px.CliRunner( + clean=_echo_graph("c", "clean"), + build=_echo_graph("b", "build"), + test=_echo_graph("t", "test"), + ) + assert runner.commands == ["clean", "build", "test"] + + def test_rejects_non_graph_value(self) -> None: + """非 Graph 值应抛出 TypeError.""" + with pytest.raises(TypeError, match="必须是 Graph 实例"): + px.CliRunner(clean="not a graph") # type: ignore[arg-type] + + def test_rejects_non_graph_list(self) -> None: + """列表类型的值应抛出 TypeError.""" + with pytest.raises(TypeError, match="必须是 Graph 实例"): + px.CliRunner(build=[1, 2, 3]) # type: ignore[arg-type] + + def test_default_strategy_is_sequential(self) -> None: + """默认策略应为 sequential.""" + runner = px.CliRunner(clean=_echo_graph()) + assert runner.strategy == "sequential" + + def test_custom_strategy(self) -> None: + """应支持自定义策略.""" + runner = px.CliRunner(strategy="thread", clean=_echo_graph()) + assert runner.strategy == "thread" + + def test_default_description_is_empty(self) -> None: + """默认描述应为空字符串.""" + runner = px.CliRunner(clean=_echo_graph()) + assert runner.description == "" + + def test_custom_description(self) -> None: + """应支持自定义描述.""" + runner = px.CliRunner(description="My CLI", clean=_echo_graph()) + assert runner.description == "My CLI" + + +# ---------------------------------------------------------------------- # +# 属性与内省 +# ---------------------------------------------------------------------- # +class TestCliRunnerProperties: + """测试 CliRunner 的属性访问.""" + + def test_commands_returns_list(self) -> None: + """commands 应返回列表.""" + runner = px.CliRunner(a=_echo_graph(), b=_echo_graph()) + assert isinstance(runner.commands, list) + + def test_graphs_returns_copy(self) -> None: + """graphs 应返回副本, 修改不影响内部状态.""" + runner = px.CliRunner(a=_echo_graph(), b=_echo_graph()) + graphs = runner.graphs + graphs["c"] = _echo_graph() + assert "c" not in runner.graphs + + def test_graphs_contains_original_graphs(self) -> None: + """graphs 应包含原始 Graph 实例.""" + g = _echo_graph() + runner = px.CliRunner(cmd=g) + assert runner.graphs["cmd"] is g + + +# ---------------------------------------------------------------------- # +# 参数解析 +# ---------------------------------------------------------------------- # +class TestCliRunnerParser: + """测试参数解析器.""" + + def test_create_parser_returns_argument_parser(self) -> None: + """create_parser 应返回 ArgumentParser.""" + from argparse import ArgumentParser + + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + assert isinstance(parser, ArgumentParser) + + def test_parser_has_command_argument(self) -> None: + """解析器应有 command 位置参数.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean"]) + assert parsed.command == "clean" + + def test_parser_command_is_optional(self) -> None: + """command 应为可选参数.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args([]) + assert parsed.command is None + + def test_parser_has_strategy_option(self) -> None: + """解析器应有 --strategy 选项.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean", "--strategy", "thread"]) + assert parsed.strategy == "thread" + + def test_parser_strategy_default(self) -> None: + """--strategy 默认值应与构造时一致.""" + runner = px.CliRunner(strategy="async", clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean"]) + assert parsed.strategy == "async" + + def test_parser_strategy_invalid_choice(self) -> None: + """--strategy 不接受非法值.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + with pytest.raises(SystemExit): + parser.parse_args(["clean", "--strategy", "invalid"]) + + def test_parser_has_dry_run_flag(self) -> None: + """解析器应有 --dry-run 标志.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean", "--dry-run"]) + assert parsed.dry_run is True + + def test_parser_dry_run_default_false(self) -> None: + """--dry-run 默认为 False.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["clean"]) + assert parsed.dry_run is False + + def test_parser_has_list_flag(self) -> None: + """解析器应有 --list 标志.""" + runner = px.CliRunner(clean=_echo_graph()) + parser = runner.create_parser() + parsed = parser.parse_args(["--list"]) + assert parsed.list is True + + def test_format_commands_help_contains_all_commands(self) -> None: + """帮助文本应包含所有命令.""" + runner = px.CliRunner( + clean=_echo_graph("c", "clean"), + build=_echo_graph("b", "build"), + ) + help_text = runner._format_commands_help() + assert "clean" in help_text + assert "build" in help_text + assert "可用命令" in help_text + + +# ---------------------------------------------------------------------- # +# 执行: 成功路径 +# ---------------------------------------------------------------------- # +class TestCliRunnerRunSuccess: + """测试 CliRunner.run 的成功执行路径.""" + + def test_run_valid_command_returns_zero( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """有效命令执行成功应返回 0.""" + runner = px.CliRunner(echo=_echo_graph()) + exit_code = runner.run(["echo"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_run_executes_correct_graph( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """应执行用户指定的命令对应的图.""" + executed: List[str] = [] + + def track_a() -> None: + executed.append("a") + + def track_b() -> None: + executed.append("b") + + runner = px.CliRunner( + a=px.Graph.from_specs([px.TaskSpec("a", track_a)]), + b=px.Graph.from_specs([px.TaskSpec("b", track_b)]), + ) + runner.run(["b"]) + assert executed == ["b"] + + def test_run_multi_task_graph(self) -> None: + """应能执行带依赖的多任务图.""" + runner = px.CliRunner(multi=_multi_task_graph()) + exit_code = runner.run(["multi"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_run_with_strategy_override(self) -> None: + """应支持通过 --strategy 覆盖默认策略.""" + runner = px.CliRunner(strategy="sequential", echo=_echo_graph()) + exit_code = runner.run(["echo", "--strategy", "thread"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_run_with_dry_run(self, capsys: pytest.CaptureFixture[str]) -> None: + """--dry-run 应只打印计划不执行.""" + runner = px.CliRunner(echo=_echo_graph()) + exit_code = runner.run(["echo", "--dry-run"]) + assert exit_code == CliExitCode.SUCCESS.value + captured = capsys.readouterr() + assert "Dry run" in captured.out + + +# ---------------------------------------------------------------------- # +# 执行: 失败路径 +# ---------------------------------------------------------------------- # +class TestCliRunnerRunFailure: + """测试 CliRunner.run 的失败执行路径.""" + + def test_run_unknown_command_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """未知命令应返回 1 并打印错误.""" + runner = px.CliRunner(clean=_echo_graph()) + exit_code = runner.run(["unknown"]) + assert exit_code == CliExitCode.FAILURE.value + captured = capsys.readouterr() + assert "未知命令" in captured.err + assert "clean" in captured.err + + def test_run_no_command_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """无命令时应返回 1 并打印帮助.""" + runner = px.CliRunner(clean=_echo_graph()) + exit_code = runner.run([]) + assert exit_code == CliExitCode.FAILURE.value + captured = capsys.readouterr() + assert "可用命令" in captured.out or "可用命令" in captured.err + + def test_run_failing_task_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """任务失败时应返回 1.""" + runner = px.CliRunner(fail=_failing_graph()) + exit_code = runner.run(["fail"]) + assert exit_code == CliExitCode.FAILURE.value + + def test_run_failing_task_prints_error( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """任务失败时应打印错误信息.""" + runner = px.CliRunner(fail=_failing_graph()) + runner.run(["fail"]) + captured = capsys.readouterr() + # PyFlowXError 信息应输出到 stderr + assert "错误" in captured.err or "失败" in captured.err + + +# ---------------------------------------------------------------------- # +# 执行: --list 选项 +# ---------------------------------------------------------------------- # +class TestCliRunnerList: + """测试 --list 选项.""" + + def test_list_returns_success(self, capsys: pytest.CaptureFixture[str]) -> None: + """--list 应返回 0.""" + runner = px.CliRunner(clean=_echo_graph(), build=_echo_graph()) + exit_code = runner.run(["--list"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_list_prints_all_commands(self, capsys: pytest.CaptureFixture[str]) -> None: + """--list 应打印所有命令.""" + runner = px.CliRunner( + clean=_echo_graph("c", "clean"), + build=_echo_graph("b", "build"), + test=_echo_graph("t", "test"), + ) + runner.run(["--list"]) + captured = capsys.readouterr() + assert "clean" in captured.out + assert "build" in captured.out + assert "test" in captured.out + + def test_list_does_not_execute_any_graph( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """--list 不应执行任何图.""" + executed: List[str] = [] + + def track() -> None: + executed.append("ran") + + runner = px.CliRunner(a=px.Graph.from_specs([px.TaskSpec("a", track)])) + runner.run(["--list"]) + assert executed == [] + + +# ---------------------------------------------------------------------- # +# 错误处理 +# ---------------------------------------------------------------------- # +class TestCliRunnerErrorHandling: + """测试错误处理.""" + + def test_keyboard_interrupt_returns_130( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """KeyboardInterrupt 应返回 130.""" + runner = px.CliRunner(echo=_echo_graph()) + + def raise_interrupt(*args: Any, **kwargs: Any) -> None: + raise KeyboardInterrupt + + with patch("pyflowx.cli.runner.run", side_effect=raise_interrupt): + exit_code = runner.run(["echo"]) + assert exit_code == CliExitCode.INTERRUPTED.value + captured = capsys.readouterr() + assert "取消" in captured.err + + def test_pyflowx_error_returns_failure( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """PyFlowXError 应返回 1.""" + runner = px.CliRunner(echo=_echo_graph()) + + def raise_error(*args: Any, **kwargs: Any) -> None: + raise TaskFailedError("echo", RuntimeError("boom"), 1) + + with patch("pyflowx.cli.runner.run", side_effect=raise_error): + exit_code = runner.run(["echo"]) + assert exit_code == CliExitCode.FAILURE.value + captured = capsys.readouterr() + assert "错误" in captured.err + + def test_generic_exception_propagates(self) -> None: + """非 PyFlowXError 的异常应向上传播.""" + + class CustomError(Exception): + pass + + runner = px.CliRunner(echo=_echo_graph()) + + def raise_custom(*args: Any, **kwargs: Any) -> None: + raise CustomError("unexpected") + + with patch("pyflowx.cli.runner.run", side_effect=raise_custom): + with pytest.raises(CustomError): + runner.run(["echo"]) + + +# ---------------------------------------------------------------------- # +# run_cli +# ---------------------------------------------------------------------- # +class TestCliRunnerRunCli: + """测试 run_cli 方法.""" + + def test_run_cli_calls_sys_exit(self) -> None: + """run_cli 应调用 sys.exit.""" + runner = px.CliRunner(echo=_echo_graph()) + with pytest.raises(SystemExit) as exc_info: + runner.run_cli(["echo"]) + assert exc_info.value.code == CliExitCode.SUCCESS.value + + def test_run_cli_exit_code_on_failure(self) -> None: + """run_cli 失败时应以非零码退出.""" + runner = px.CliRunner(fail=_failing_graph()) + with pytest.raises(SystemExit) as exc_info: + runner.run_cli(["fail"]) + assert exc_info.value.code == CliExitCode.FAILURE.value + + def test_run_cli_no_args_uses_sys_argv( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """run_cli 无参数时应使用 sys.argv.""" + monkeypatch.setattr(sys, "argv", ["pymake", "echo"]) + runner = px.CliRunner(echo=_echo_graph()) + with pytest.raises(SystemExit) as exc_info: + runner.run_cli() + assert exc_info.value.code == CliExitCode.SUCCESS.value + + +# ---------------------------------------------------------------------- # +# 退出码枚举 +# ---------------------------------------------------------------------- # +class TestCliExitCode: + """测试 CliExitCode 枚举.""" + + def test_success_is_zero(self) -> None: + assert CliExitCode.SUCCESS.value == 0 + + def test_failure_is_one(self) -> None: + assert CliExitCode.FAILURE.value == 1 + + def test_interrupted_is_130(self) -> None: + assert CliExitCode.INTERRUPTED.value == 130 + + def test_exit_codes_are_distinct(self) -> None: + values = {e.value for e in CliExitCode} + assert len(values) == 3 + + +# ---------------------------------------------------------------------- # +# 集成测试 +# ---------------------------------------------------------------------- # +class TestCliRunnerIntegration: + """集成测试: CliRunner + Graph + TaskSpec + 条件.""" + + def test_condition_skipped_command_succeeds(self) -> None: + """条件不满足时任务跳过, 整体仍成功.""" + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "skip_me", + cmd=[*ECHO_CMD, "should not run"], + conditions=(lambda: False,), + ), + ] + ) + runner = px.CliRunner(skip=graph) + exit_code = runner.run(["skip"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_condition_met_command_succeeds(self) -> None: + """条件满足时任务执行, 整体成功.""" + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "run_me", + cmd=[*ECHO_CMD, "should run"], + conditions=(lambda: True,), + ), + ] + ) + runner = px.CliRunner(run=graph) + exit_code = runner.run(["run"]) + assert exit_code == CliExitCode.SUCCESS.value + + def test_diamond_dependency_graph(self) -> None: + """菱形依赖图应正确执行.""" + order: List[str] = [] + + def make(name: str) -> Any: + def fn() -> str: + order.append(name) + return name + + return fn + + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + px.TaskSpec("c", make("c"), depends_on=("a",)), + px.TaskSpec("d", make("d"), depends_on=("b", "c")), + ] + ) + runner = px.CliRunner(diamond=graph) + exit_code = runner.run(["diamond"]) + assert exit_code == CliExitCode.SUCCESS.value + assert order == ["a", "b", "c", "d"] + + def test_mixed_fn_and_cmd_commands(self) -> None: + """混合 fn 和 cmd 的命令应都能执行.""" + runner = px.CliRunner( + fn_cmd=px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]), + cmd_cmd=px.Graph.from_specs( + [px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])] + ), + ) + assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value + assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value + + def test_command_with_cwd(self) -> None: + """带 cwd 的命令应正确执行.""" + import tempfile + from pathlib import Path + + with tempfile.TemporaryDirectory() as tmpdir: + if sys.platform == "win32": + ls_cmd = ["cmd", "/c", "dir"] + else: + ls_cmd = ["ls"] + + graph = px.Graph.from_specs( + [px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))] + ) + runner = px.CliRunner(ls=graph) + exit_code = runner.run(["ls"]) + assert exit_code == CliExitCode.SUCCESS.value + + +# ---------------------------------------------------------------------- # +# 导出测试 +# ---------------------------------------------------------------------- # +class TestCliRunnerExport: + """测试 CliRunner 从顶层包导出.""" + + def test_cli_runner_exported_from_pyflowx(self) -> None: + """CliRunner 应从 pyflowx 顶层导出.""" + assert hasattr(px, "CliRunner") + assert px.CliRunner is CliRunner + + def test_cli_exit_code_exported_from_pyflowx(self) -> None: + """CliExitCode 应从 pyflowx 顶层导出.""" + assert hasattr(px, "CliExitCode") + assert px.CliExitCode is CliExitCode + + def test_cli_runner_in_all(self) -> None: + """CliRunner 应在 __all__ 中.""" + assert "CliRunner" in px.__all__ + + def test_cli_exit_code_in_all(self) -> None: + """CliExitCode 应在 __all__ 中.""" + assert "CliExitCode" in px.__all__ + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])