+cli runner

This commit is contained in:
2026-06-20 16:52:48 +08:00
parent fad964b370
commit 4de55336f1
5 changed files with 1173 additions and 345 deletions
+4
View File
@@ -60,6 +60,7 @@ from .conditions import (
IS_POSIX,
IS_WINDOWS,
)
from .cli import CliExitCode, CliRunner
from .context import Context, build_call_args, describe_injection
from .errors import (
CycleError,
@@ -91,6 +92,9 @@ __all__ = [
"RunReport",
# 执行
"run",
# CLI 运行器
"CliRunner",
"CliExitCode",
# 状态后端
"StateBackend",
"MemoryBackend",
+3
View File
@@ -0,0 +1,3 @@
from .runner import CliExitCode, CliRunner
__all__ = ["CliRunner", "CliExitCode"]
+352 -345
View File
@@ -9,6 +9,7 @@ from __future__ import annotations
from pathlib import Path
import pyflowx as px
from pyflowx.conditions import BuiltinConditions, Constants
class PymakeConfig:
@@ -41,13 +42,355 @@ class PymakeConfig:
DOC_BUILD_COMMAND: list[str] = ["sphinx-build", "-b", "html", "docs", "docs/_build"]
# 清理
DIRS_TO_IGNORE: list[str] = [".venv"]
DIRS_TO_IGNORE: list[str] = [".venv", ".git", ".tox"]
PYTHON_BUILD_DIRS: list[str] = ["dist", "build", "*.egg-info", "src/*.egg-info"]
conf = PymakeConfig()
def _get_maturin_build_command() -> list[str]:
"""获取 maturin 构建命令(根据平台自动添加参数).
Returns
-------
list[str]
完整的 maturin 构建命令列表.
"""
base_cmd = conf.MATURIN_BUILD_COMMAND.copy()
if Constants.IS_WINDOWS:
base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7)
return base_cmd
# 命令条件判断
_MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL)
_PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
_UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
_HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
_RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
_GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
_TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
def _build_graphs() -> dict[str, px.Graph]:
"""构建所有命令对应的任务流图.
将原本的 CommandScheduler/RunCommand 模式转换为 Graph/TaskSpec 模式,
每个 Graph 是一个独立的任务流, 由 CliRunner 根据用户输入选择执行.
"""
return {
# === 构建命令 ===
# 构建 Python 包
"b": px.Graph.from_specs(
[
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 构建 Rust 核心模块
"bc": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 构建双包(先 Rust 后 Python
"ba": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("maturin_build",),
),
]
),
# === 安装命令(开发模式) ===
# 安装 Rust 核心模块
"ic": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]
),
# 安装 Python 主包
"ip": px.Graph.from_specs(
[
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
),
]
),
# 安装双包(开发模式)
"ia": px.Graph.from_specs(
[
px.TaskSpec(
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
depends_on=("maturin_dev",),
),
]
),
# === 清理命令 ===
# 清理 Python 构建产物
"cp": px.Graph.from_specs(
[
px.TaskSpec(
"git_clean_python",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]
),
# 清理 Rust 构建产物
"cc": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
]
),
# 清理所有构建产物
"ca": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
),
px.TaskSpec(
"git_clean",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
),
]
),
# === 开发工具 ===
# 运行测试, 跳过 slow, 并行模式
"t": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
cmd=[
"pytest",
"-m",
"not slow",
"-n",
"8",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试, 非并行模式
"tf": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
cmd=[
"pytest",
"-m",
"not slow",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试并生成覆盖率报告, 跳过 slow, 并行模式
"tc": px.Graph.from_specs(
[
px.TaskSpec(
"pytest_cov",
cmd=[
"pytest",
"-m",
"not slow",
"--cov",
"-n",
"auto",
"--dist",
"loadfile",
"--tb=short",
"-v",
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 代码格式化与检查
"lint": px.Graph.from_specs(
[
px.TaskSpec(
"ruff_check",
cmd=[
"ruff",
"check",
"--fix",
"--unsafe-fixes",
],
conditions=(_RUFF_CONDITION,),
timeout=conf.TIMEOUT,
cwd=Path(conf.PROJECT_ROOT),
),
]
),
# 类型检查
"typecheck": px.Graph.from_specs(
[
px.TaskSpec(
"ty_check",
cmd=["ty", "check", "src/bitool"],
conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),),
),
]
),
# 构建文档
"doc": px.Graph.from_specs(
[
px.TaskSpec(
"sphinx_build",
cmd=conf.DOC_BUILD_COMMAND,
conditions=(
BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),
),
),
]
),
# === 发布命令 ===
# 发布 Python 主包到 PyPI
"pb": px.Graph.from_specs(
[
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 发布所有包(先 Rust 后 Python
"pba": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=[
"twine",
"upload",
"--disable-progress-bar",
conf.CORE_PATTERN,
],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("publish_rust",),
),
]
),
# 发布 Rust 核心模块 (maturin publish)
"pbc": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=["maturin", "publish"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# === 多版本测试命令 ===
# 运行多版本 Python 测试 (tox)
"tox": px.Graph.from_specs(
[
px.TaskSpec(
"tox_run",
cmd=["tox", "-p", "auto"],
conditions=(_TOX_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 安装多版本 Python (仅安装不测试)
"tox-install": px.Graph.from_specs(
[
px.TaskSpec(
"uv_python_install",
cmd=[
"uv",
"python",
"install",
"3.8",
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
],
conditions=(_UV_CONDITION,),
timeout=600,
),
]
),
}
def main():
"""
╔══════════════════════════════════════════════════════════╗
@@ -81,7 +424,9 @@ def main():
pymake tox-install - 安装所有 Python 版本 (仅安装不测试)
📦 发布命令:
pymake pb - 发布到 PyPI (先 Rust 后 Python)
pymake pb - 发布到 PyPI (hatch publish)
pymake pba - 发布所有包 (先 Rust 后 Python)
pymake pbc - 发布 Rust 核心模块 (maturin publish)
💡 常用工作流:
1. 初始化开发环境: pymake ia
@@ -96,349 +441,11 @@ def main():
pymake ia # 安装开发环境
pymake t # 运行测试
pymake tox # 多版本兼容性测试
pymake lint # 格式化代码
pymake lint # 格式化代码
pymake ca # 清理所有构建产物
"""
pymake_graph = px.Graph.from_specs(
[
px.TaskSpec("b", cmd=conf.BUILD_COMMAND),
px.TaskSpec("bc", cmd=conf.MATURIN_BUILD_COMMAND),
px.TaskSpec("ic", cmd=conf.DOC_BUILD_COMMAND),
]
runner = px.CliRunner(
description="PyMake - Python 构建工具 (替代 Makefile)",
**_build_graphs(),
)
px.run(pymake_graph)
# class PyMakeSkill(MapSkill):
# """PyMake 构建技能."""
# name: ClassVar[str] = "pymake"
# description: ClassVar[str] = "Bitool PyMake - Python构建工具"
# @override
# def create_scheduler_map(
# self,
# args: argparse.Namespace,
# ) -> dict[str, CommandScheduler] | None:
# return {
# # === 构建命令 ===
# # 构建 Python 包
# "b": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.BUILD_COMMAND,
# allow_conditions=[_UV_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 构建 Rust 核心模块
# "bc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=_get_maturin_build_command(),
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 构建双包(先 Rust 后 Python
# "ba": CommandScheduler(
# commands=[
# RunCommand(
# name="maturin_build",
# cmd=_get_maturin_build_command(),
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# RunCommand(
# name="uv_build",
# cmd=conf.BUILD_COMMAND,
# allow_conditions=[_UV_CONDITION],
# timeout=conf.TIMEOUT,
# dependencies=["maturin_build"],
# ),
# ],
# ),
# # === 安装命令(开发模式) ===
# # 安装 Rust 核心模块
# "ic": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.MATURIN_DEV_COMMAND,
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# ],
# ),
# # 安装 Python 主包
# "ip": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["uv", "pip", "install", "-e", "."],
# allow_conditions=[_UV_CONDITION],
# success_codes={0, 2},
# ),
# ],
# ),
# # 安装双包(开发模式)
# "ia": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.MATURIN_DEV_COMMAND,
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# RunCommand(
# cmd=["uv", "pip", "install", "-e", "."],
# allow_conditions=[_UV_CONDITION],
# success_codes={0, 2},
# ),
# ],
# ),
# # === 清理命令 ===
# # 清理 Python 构建产物
# "cp": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["rm", "-rf", *conf.PYTHON_BUILD_DIRS],
# allow_conditions=[_GIT_CONDITION], # 使用 git clean 更安全
# ),
# ],
# ),
# # 清理 Rust 构建产物
# "cc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["cargo", "clean"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[
# _MATURIN_CONDITION,
# ], # 有 maturin 说明有 cargo
# ),
# ],
# ),
# # 清理所有构建产物
# "ca": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["cargo", "clean"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# RunCommand(
# cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
# allow_conditions=[_GIT_CONDITION],
# ),
# ],
# ),
# # === 开发工具 ===
# # 运行测试, 跳过 slow, 并行模式
# "t": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "-n",
# "8",
# "--dist",
# "loadfile",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 运行测试, 非并行模式
# "tf": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "--dist",
# "loadfile",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式
# # --dist loadfile: 按文件分发测试, 减少模块导入开销
# "tc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "--cov",
# "-n",
# "auto",
# "--dist",
# "loadfile",
# "--tb=short",
# "-v",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 代码格式化与检查
# "lint": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "ruff",
# "check",
# "--fix",
# "--unsafe-fixes",
# ],
# allow_conditions=[_RUFF_CONDITION],
# timeout=conf.TIMEOUT,
# cwd=Path(conf.PROJECT_ROOT),
# ),
# ],
# ),
# # 类型检查
# "typecheck": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["ty", "check", "src/bitool"],
# allow_conditions=[BuiltinConditions.HAS_APP_INSTALLED("ty")],
# ),
# ],
# ),
# # 构建文档
# "doc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.DOC_BUILD_COMMAND,
# allow_conditions=[
# BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),
# ],
# ),
# ],
# ),
# # 发布到 PyPI(先发布 Rust 核心模块,再发布 Python 主包)
# "pb": CommandScheduler(
# commands=[
# # 发布 Python 主包(在项目根目录执行,依赖 Rust 发布成功)
# RunCommand(
# name="publish-python",
# cmd=["hatch", "publish"],
# cwd=Path(conf.PROJECT_ROOT),
# allow_conditions=[_HATCH_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# "pba": CommandScheduler(
# commands=[
# # 发布 Rust 核心模块(在 core 目录执行)
# RunCommand(
# name="publish-rust",
# # --disable-progress-bar: 避免 Windows GBK 控制台渲染 rich 进度条
# # 中的 \u2022 字符导致 UnicodeEncodeError
# cmd=[
# "twine",
# "upload",
# "--disable-progress-bar",
# conf.CORE_PATTERN,
# ],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# RunCommand(
# name="publish-python",
# cmd=["hatch", "publish"],
# cwd=Path(conf.PROJECT_ROOT),
# allow_conditions=[_HATCH_CONDITION],
# timeout=conf.TIMEOUT,
# dependencies=["publish-rust"],
# ),
# ],
# ),
# "pbc": CommandScheduler(
# commands=[
# # 发布 Rust 核心模块(在 core 目录执行)
# RunCommand(
# name="publish-rust",
# cmd=["maturin", "publish"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # === 多版本测试命令 ===
# # 运行多版本 Python 测试 (tox)
# "tox": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["tox", "-p", "auto"],
# allow_conditions=[_TOX_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 安装多版本 Python (仅安装不测试)
# "tox-install": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "uv",
# "python",
# "install",
# "3.8",
# "3.9",
# "3.10",
# "3.11",
# "3.12",
# "3.13",
# "3.14",
# ],
# allow_conditions=[_UV_CONDITION],
# timeout=600,
# ),
# ],
# ),
# }
# def _get_maturin_build_command() -> list[str]:
# """获取 maturin 构建命令(根据平台自动添加参数).
# Returns
# -------
# list[str]
# 完整的 maturin 构建命令列表.
# """
# base_cmd = conf.MATURIN_BUILD_COMMAND.copy()
# if Constants.IS_WINDOWS:
# base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7)
# return base_cmd
# # 命令条件判断
# _MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL)
# _PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
# _UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
# _HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
# _RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
# _GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
# _TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
runner.run_cli()
+241
View File
@@ -0,0 +1,241 @@
"""命令行运行器:根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例,
通过 argparse 解析用户输入的命令并执行对应的图.
与 bitool_skill.MapSkill 的区别:
- MapSkill 通过继承 + create_scheduler_map 构建命令映射
- CliRunner 通过关键字参数直接注入命令到图的映射, 更声明式
- CliRunner 复用 pyflowx 的 DAG 调度能力 (run/Graph/TaskSpec)
"""
from __future__ import annotations
import argparse
import enum
import sys
from typing import Dict, List, Optional, Sequence
from ..errors import PyFlowXError
from ..executors import Strategy, run
from ..graph import Graph
__all__ = ["CliRunner", "CliExitCode"]
class CliExitCode(enum.IntEnum):
"""CliRunner 退出码."""
SUCCESS = 0
FAILURE = 1
INTERRUPTED = 130 # 与 POSIX 信号中断一致
class CliRunner:
"""命令行运行器: 根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例.
通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图.
Parameters
----------
strategy : str
默认执行策略 (``"sequential"`` / ``"thread"`` / ``"async"``).
可被命令行 ``--strategy`` 覆盖.
description : str
CLI 描述文本, 显示在 ``--help`` 中.
**graphs : Graph
命令名到图的映射. 每个 key 是一个命令名, value 是对应的
:class:`~pyflowx.graph.Graph`.
Examples
--------
基本用法::
runner = px.CliRunner(
clean=px.Graph.from_specs([
px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]),
]),
build=px.Graph.from_specs([
px.TaskSpec("uv_build", cmd=["uv", "build"]),
]),
)
runner.run() # 解析 sys.argv
指定策略与描述::
runner = px.CliRunner(
strategy="thread",
description="My build tool",
test=px.Graph.from_specs([...]),
)
runner.run(["test", "--strategy", "sequential"])
"""
def __init__(
self,
*,
strategy: Strategy = "sequential",
description: str = "",
**graphs: Graph,
) -> None:
if not graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 校验所有值都是 Graph
for name, graph in graphs.items():
if not isinstance(graph, Graph):
raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}")
self._graphs: Dict[str, Graph] = dict(graphs)
self._strategy: Strategy = strategy
self._description: str = description
# ------------------------------------------------------------------ #
# 内省
# ------------------------------------------------------------------ #
@property
def commands(self) -> List[str]:
"""可用的命令列表 (按插入顺序)."""
return list(self._graphs.keys())
@property
def graphs(self) -> Dict[str, Graph]:
"""命令名到图的映射 (只读副本)."""
return dict(self._graphs)
@property
def strategy(self) -> Strategy:
"""默认执行策略."""
return self._strategy
@property
def description(self) -> str:
"""CLI 描述文本."""
return self._description
# ------------------------------------------------------------------ #
# 参数解析
# ------------------------------------------------------------------ #
def _prog_name(self) -> str:
"""从 sys.argv[0] 推导程序名."""
import os
return os.path.basename(sys.argv[0]) if sys.argv else "pyflowx"
def create_parser(self) -> argparse.ArgumentParser:
"""创建参数解析器.
子类可覆盖此方法以添加自定义参数. 覆盖时应保留 ``command``
位置参数与 ``--strategy`` / ``--dry-run`` / ``--list`` 选项,
否则 :meth:`run` 的默认逻辑可能失效.
Returns
-------
argparse.ArgumentParser
新创建的参数解析器实例.
"""
parser = argparse.ArgumentParser(
prog=self._prog_name(),
description=self._description or "PyFlowX CLI Runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=self._format_commands_help(),
)
parser.add_argument(
"command",
nargs="?",
help="要执行的命令",
)
parser.add_argument(
"--strategy",
choices=["sequential", "thread", "async"],
default=self._strategy,
help="执行策略 (默认: %(default)s)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="只打印执行计划, 不实际运行",
)
parser.add_argument(
"--list",
action="store_true",
help="列出所有可用命令",
)
return parser
def _format_commands_help(self) -> str:
"""格式化命令帮助文本."""
lines = ["可用命令:"]
for cmd in self._graphs:
lines.append(f" {cmd}")
return "\n".join(lines)
# ------------------------------------------------------------------ #
# 执行
# ------------------------------------------------------------------ #
def run(self, args: Optional[Sequence[str]] = None) -> int:
"""解析参数并执行对应的图.
Parameters
----------
args : Sequence[str] | None
参数列表, 默认使用 ``sys.argv[1:]``.
Returns
-------
int
退出码 (0 成功, 1 失败, 130 中断).
Raises
------
SystemExit
当 argparse 无法解析参数时 (与标准 argparse 行为一致).
"""
parser = self.create_parser()
parsed = parser.parse_args(args)
# --list: 列出命令
if parsed.list:
print(self._format_commands_help())
return CliExitCode.SUCCESS.value
# 无命令: 显示帮助
if not parsed.command:
parser.print_help()
return CliExitCode.FAILURE.value
# 验证命令
if parsed.command not in self._graphs:
available = ", ".join(self._graphs.keys())
print(
f"错误: 未知命令 {parsed.command!r} (可用命令: {available})",
file=sys.stderr,
)
return CliExitCode.FAILURE.value
# 执行对应的图
graph = self._graphs[parsed.command]
try:
report = run(
graph,
strategy=parsed.strategy,
dry_run=parsed.dry_run,
)
return CliExitCode.SUCCESS.value if report.success else CliExitCode.FAILURE.value
except KeyboardInterrupt:
print("\n操作已取消", file=sys.stderr)
return CliExitCode.INTERRUPTED.value
except PyFlowXError as e:
print(f"错误: {e}", file=sys.stderr)
return CliExitCode.FAILURE.value
def run_cli(self, args: Optional[Sequence[str]] = None) -> None:
"""运行并以退出码退出进程.
作为 CLI 工具运行时的入口点, 等价于 ``sys.exit(self.run(args))``.
Parameters
----------
args : Sequence[str] | None
参数列表, 默认使用 ``sys.argv[1:]``.
"""
sys.exit(self.run(args))
+573
View File
@@ -0,0 +1,573 @@
"""Tests for CliRunner: command dispatch, argument parsing, exit codes."""
from __future__ import annotations
import sys
from typing import Any, List
from unittest.mock import patch
import pytest
import pyflowx as px
from pyflowx.cli import CliExitCode, CliRunner
from pyflowx.errors import PyFlowXError, TaskFailedError
# 跨平台的 echo 命令
if sys.platform == "win32":
ECHO_CMD = ["cmd", "/c", "echo"]
else:
ECHO_CMD = ["echo"]
# ---------------------------------------------------------------------- #
# 辅助工厂
# ---------------------------------------------------------------------- #
def _echo_graph(name: str = "echo_task", msg: str = "hello") -> px.Graph:
"""构造一个单任务 echo 图, 用于执行成功场景."""
return px.Graph.from_specs([px.TaskSpec(name, cmd=[*ECHO_CMD, msg])])
def _failing_graph() -> px.Graph:
"""构造一个必定失败的单任务图."""
return px.Graph.from_specs(
[
px.TaskSpec(
"fail",
cmd=["python", "-c", "import sys; sys.exit(1)"],
)
]
)
def _multi_task_graph() -> px.Graph:
"""构造一个带依赖的多任务图."""
return px.Graph.from_specs(
[
px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]),
px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)),
]
)
# ---------------------------------------------------------------------- #
# 构造与校验
# ---------------------------------------------------------------------- #
class TestCliRunnerConstruction:
"""测试 CliRunner 的构造与参数校验."""
def test_requires_at_least_one_command(self) -> None:
"""没有命令时应抛出 ValueError."""
with pytest.raises(ValueError, match="至少需要一个命令"):
px.CliRunner()
def test_accepts_single_graph(self) -> None:
"""单个命令应正常构造."""
runner = px.CliRunner(clean=_echo_graph())
assert runner.commands == ["clean"]
def test_accepts_multiple_graphs(self) -> None:
"""多个命令应按插入顺序保留."""
runner = px.CliRunner(
clean=_echo_graph("c", "clean"),
build=_echo_graph("b", "build"),
test=_echo_graph("t", "test"),
)
assert runner.commands == ["clean", "build", "test"]
def test_rejects_non_graph_value(self) -> None:
"""非 Graph 值应抛出 TypeError."""
with pytest.raises(TypeError, match="必须是 Graph 实例"):
px.CliRunner(clean="not a graph") # type: ignore[arg-type]
def test_rejects_non_graph_list(self) -> None:
"""列表类型的值应抛出 TypeError."""
with pytest.raises(TypeError, match="必须是 Graph 实例"):
px.CliRunner(build=[1, 2, 3]) # type: ignore[arg-type]
def test_default_strategy_is_sequential(self) -> None:
"""默认策略应为 sequential."""
runner = px.CliRunner(clean=_echo_graph())
assert runner.strategy == "sequential"
def test_custom_strategy(self) -> None:
"""应支持自定义策略."""
runner = px.CliRunner(strategy="thread", clean=_echo_graph())
assert runner.strategy == "thread"
def test_default_description_is_empty(self) -> None:
"""默认描述应为空字符串."""
runner = px.CliRunner(clean=_echo_graph())
assert runner.description == ""
def test_custom_description(self) -> None:
"""应支持自定义描述."""
runner = px.CliRunner(description="My CLI", clean=_echo_graph())
assert runner.description == "My CLI"
# ---------------------------------------------------------------------- #
# 属性与内省
# ---------------------------------------------------------------------- #
class TestCliRunnerProperties:
"""测试 CliRunner 的属性访问."""
def test_commands_returns_list(self) -> None:
"""commands 应返回列表."""
runner = px.CliRunner(a=_echo_graph(), b=_echo_graph())
assert isinstance(runner.commands, list)
def test_graphs_returns_copy(self) -> None:
"""graphs 应返回副本, 修改不影响内部状态."""
runner = px.CliRunner(a=_echo_graph(), b=_echo_graph())
graphs = runner.graphs
graphs["c"] = _echo_graph()
assert "c" not in runner.graphs
def test_graphs_contains_original_graphs(self) -> None:
"""graphs 应包含原始 Graph 实例."""
g = _echo_graph()
runner = px.CliRunner(cmd=g)
assert runner.graphs["cmd"] is g
# ---------------------------------------------------------------------- #
# 参数解析
# ---------------------------------------------------------------------- #
class TestCliRunnerParser:
"""测试参数解析器."""
def test_create_parser_returns_argument_parser(self) -> None:
"""create_parser 应返回 ArgumentParser."""
from argparse import ArgumentParser
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
assert isinstance(parser, ArgumentParser)
def test_parser_has_command_argument(self) -> None:
"""解析器应有 command 位置参数."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["clean"])
assert parsed.command == "clean"
def test_parser_command_is_optional(self) -> None:
"""command 应为可选参数."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args([])
assert parsed.command is None
def test_parser_has_strategy_option(self) -> None:
"""解析器应有 --strategy 选项."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["clean", "--strategy", "thread"])
assert parsed.strategy == "thread"
def test_parser_strategy_default(self) -> None:
"""--strategy 默认值应与构造时一致."""
runner = px.CliRunner(strategy="async", clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["clean"])
assert parsed.strategy == "async"
def test_parser_strategy_invalid_choice(self) -> None:
"""--strategy 不接受非法值."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
with pytest.raises(SystemExit):
parser.parse_args(["clean", "--strategy", "invalid"])
def test_parser_has_dry_run_flag(self) -> None:
"""解析器应有 --dry-run 标志."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["clean", "--dry-run"])
assert parsed.dry_run is True
def test_parser_dry_run_default_false(self) -> None:
"""--dry-run 默认为 False."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["clean"])
assert parsed.dry_run is False
def test_parser_has_list_flag(self) -> None:
"""解析器应有 --list 标志."""
runner = px.CliRunner(clean=_echo_graph())
parser = runner.create_parser()
parsed = parser.parse_args(["--list"])
assert parsed.list is True
def test_format_commands_help_contains_all_commands(self) -> None:
"""帮助文本应包含所有命令."""
runner = px.CliRunner(
clean=_echo_graph("c", "clean"),
build=_echo_graph("b", "build"),
)
help_text = runner._format_commands_help()
assert "clean" in help_text
assert "build" in help_text
assert "可用命令" in help_text
# ---------------------------------------------------------------------- #
# 执行: 成功路径
# ---------------------------------------------------------------------- #
class TestCliRunnerRunSuccess:
"""测试 CliRunner.run 的成功执行路径."""
def test_run_valid_command_returns_zero(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""有效命令执行成功应返回 0."""
runner = px.CliRunner(echo=_echo_graph())
exit_code = runner.run(["echo"])
assert exit_code == CliExitCode.SUCCESS.value
def test_run_executes_correct_graph(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""应执行用户指定的命令对应的图."""
executed: List[str] = []
def track_a() -> None:
executed.append("a")
def track_b() -> None:
executed.append("b")
runner = px.CliRunner(
a=px.Graph.from_specs([px.TaskSpec("a", track_a)]),
b=px.Graph.from_specs([px.TaskSpec("b", track_b)]),
)
runner.run(["b"])
assert executed == ["b"]
def test_run_multi_task_graph(self) -> None:
"""应能执行带依赖的多任务图."""
runner = px.CliRunner(multi=_multi_task_graph())
exit_code = runner.run(["multi"])
assert exit_code == CliExitCode.SUCCESS.value
def test_run_with_strategy_override(self) -> None:
"""应支持通过 --strategy 覆盖默认策略."""
runner = px.CliRunner(strategy="sequential", echo=_echo_graph())
exit_code = runner.run(["echo", "--strategy", "thread"])
assert exit_code == CliExitCode.SUCCESS.value
def test_run_with_dry_run(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--dry-run 应只打印计划不执行."""
runner = px.CliRunner(echo=_echo_graph())
exit_code = runner.run(["echo", "--dry-run"])
assert exit_code == CliExitCode.SUCCESS.value
captured = capsys.readouterr()
assert "Dry run" in captured.out
# ---------------------------------------------------------------------- #
# 执行: 失败路径
# ---------------------------------------------------------------------- #
class TestCliRunnerRunFailure:
"""测试 CliRunner.run 的失败执行路径."""
def test_run_unknown_command_returns_failure(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""未知命令应返回 1 并打印错误."""
runner = px.CliRunner(clean=_echo_graph())
exit_code = runner.run(["unknown"])
assert exit_code == CliExitCode.FAILURE.value
captured = capsys.readouterr()
assert "未知命令" in captured.err
assert "clean" in captured.err
def test_run_no_command_returns_failure(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""无命令时应返回 1 并打印帮助."""
runner = px.CliRunner(clean=_echo_graph())
exit_code = runner.run([])
assert exit_code == CliExitCode.FAILURE.value
captured = capsys.readouterr()
assert "可用命令" in captured.out or "可用命令" in captured.err
def test_run_failing_task_returns_failure(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""任务失败时应返回 1."""
runner = px.CliRunner(fail=_failing_graph())
exit_code = runner.run(["fail"])
assert exit_code == CliExitCode.FAILURE.value
def test_run_failing_task_prints_error(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""任务失败时应打印错误信息."""
runner = px.CliRunner(fail=_failing_graph())
runner.run(["fail"])
captured = capsys.readouterr()
# PyFlowXError 信息应输出到 stderr
assert "错误" in captured.err or "失败" in captured.err
# ---------------------------------------------------------------------- #
# 执行: --list 选项
# ---------------------------------------------------------------------- #
class TestCliRunnerList:
"""测试 --list 选项."""
def test_list_returns_success(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--list 应返回 0."""
runner = px.CliRunner(clean=_echo_graph(), build=_echo_graph())
exit_code = runner.run(["--list"])
assert exit_code == CliExitCode.SUCCESS.value
def test_list_prints_all_commands(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--list 应打印所有命令."""
runner = px.CliRunner(
clean=_echo_graph("c", "clean"),
build=_echo_graph("b", "build"),
test=_echo_graph("t", "test"),
)
runner.run(["--list"])
captured = capsys.readouterr()
assert "clean" in captured.out
assert "build" in captured.out
assert "test" in captured.out
def test_list_does_not_execute_any_graph(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""--list 不应执行任何图."""
executed: List[str] = []
def track() -> None:
executed.append("ran")
runner = px.CliRunner(a=px.Graph.from_specs([px.TaskSpec("a", track)]))
runner.run(["--list"])
assert executed == []
# ---------------------------------------------------------------------- #
# 错误处理
# ---------------------------------------------------------------------- #
class TestCliRunnerErrorHandling:
"""测试错误处理."""
def test_keyboard_interrupt_returns_130(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""KeyboardInterrupt 应返回 130."""
runner = px.CliRunner(echo=_echo_graph())
def raise_interrupt(*args: Any, **kwargs: Any) -> None:
raise KeyboardInterrupt
with patch("pyflowx.cli.runner.run", side_effect=raise_interrupt):
exit_code = runner.run(["echo"])
assert exit_code == CliExitCode.INTERRUPTED.value
captured = capsys.readouterr()
assert "取消" in captured.err
def test_pyflowx_error_returns_failure(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""PyFlowXError 应返回 1."""
runner = px.CliRunner(echo=_echo_graph())
def raise_error(*args: Any, **kwargs: Any) -> None:
raise TaskFailedError("echo", RuntimeError("boom"), 1)
with patch("pyflowx.cli.runner.run", side_effect=raise_error):
exit_code = runner.run(["echo"])
assert exit_code == CliExitCode.FAILURE.value
captured = capsys.readouterr()
assert "错误" in captured.err
def test_generic_exception_propagates(self) -> None:
"""非 PyFlowXError 的异常应向上传播."""
class CustomError(Exception):
pass
runner = px.CliRunner(echo=_echo_graph())
def raise_custom(*args: Any, **kwargs: Any) -> None:
raise CustomError("unexpected")
with patch("pyflowx.cli.runner.run", side_effect=raise_custom):
with pytest.raises(CustomError):
runner.run(["echo"])
# ---------------------------------------------------------------------- #
# run_cli
# ---------------------------------------------------------------------- #
class TestCliRunnerRunCli:
"""测试 run_cli 方法."""
def test_run_cli_calls_sys_exit(self) -> None:
"""run_cli 应调用 sys.exit."""
runner = px.CliRunner(echo=_echo_graph())
with pytest.raises(SystemExit) as exc_info:
runner.run_cli(["echo"])
assert exc_info.value.code == CliExitCode.SUCCESS.value
def test_run_cli_exit_code_on_failure(self) -> None:
"""run_cli 失败时应以非零码退出."""
runner = px.CliRunner(fail=_failing_graph())
with pytest.raises(SystemExit) as exc_info:
runner.run_cli(["fail"])
assert exc_info.value.code == CliExitCode.FAILURE.value
def test_run_cli_no_args_uses_sys_argv(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""run_cli 无参数时应使用 sys.argv."""
monkeypatch.setattr(sys, "argv", ["pymake", "echo"])
runner = px.CliRunner(echo=_echo_graph())
with pytest.raises(SystemExit) as exc_info:
runner.run_cli()
assert exc_info.value.code == CliExitCode.SUCCESS.value
# ---------------------------------------------------------------------- #
# 退出码枚举
# ---------------------------------------------------------------------- #
class TestCliExitCode:
"""测试 CliExitCode 枚举."""
def test_success_is_zero(self) -> None:
assert CliExitCode.SUCCESS.value == 0
def test_failure_is_one(self) -> None:
assert CliExitCode.FAILURE.value == 1
def test_interrupted_is_130(self) -> None:
assert CliExitCode.INTERRUPTED.value == 130
def test_exit_codes_are_distinct(self) -> None:
values = {e.value for e in CliExitCode}
assert len(values) == 3
# ---------------------------------------------------------------------- #
# 集成测试
# ---------------------------------------------------------------------- #
class TestCliRunnerIntegration:
"""集成测试: CliRunner + Graph + TaskSpec + 条件."""
def test_condition_skipped_command_succeeds(self) -> None:
"""条件不满足时任务跳过, 整体仍成功."""
graph = px.Graph.from_specs(
[
px.TaskSpec(
"skip_me",
cmd=[*ECHO_CMD, "should not run"],
conditions=(lambda: False,),
),
]
)
runner = px.CliRunner(skip=graph)
exit_code = runner.run(["skip"])
assert exit_code == CliExitCode.SUCCESS.value
def test_condition_met_command_succeeds(self) -> None:
"""条件满足时任务执行, 整体成功."""
graph = px.Graph.from_specs(
[
px.TaskSpec(
"run_me",
cmd=[*ECHO_CMD, "should run"],
conditions=(lambda: True,),
),
]
)
runner = px.CliRunner(run=graph)
exit_code = runner.run(["run"])
assert exit_code == CliExitCode.SUCCESS.value
def test_diamond_dependency_graph(self) -> None:
"""菱形依赖图应正确执行."""
order: List[str] = []
def make(name: str) -> Any:
def fn() -> str:
order.append(name)
return name
return fn
graph = px.Graph.from_specs(
[
px.TaskSpec("a", make("a")),
px.TaskSpec("b", make("b"), depends_on=("a",)),
px.TaskSpec("c", make("c"), depends_on=("a",)),
px.TaskSpec("d", make("d"), depends_on=("b", "c")),
]
)
runner = px.CliRunner(diamond=graph)
exit_code = runner.run(["diamond"])
assert exit_code == CliExitCode.SUCCESS.value
assert order == ["a", "b", "c", "d"]
def test_mixed_fn_and_cmd_commands(self) -> None:
"""混合 fn 和 cmd 的命令应都能执行."""
runner = px.CliRunner(
fn_cmd=px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]),
cmd_cmd=px.Graph.from_specs(
[px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]
),
)
assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value
assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value
def test_command_with_cwd(self) -> None:
"""带 cwd 的命令应正确执行."""
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
if sys.platform == "win32":
ls_cmd = ["cmd", "/c", "dir"]
else:
ls_cmd = ["ls"]
graph = px.Graph.from_specs(
[px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))]
)
runner = px.CliRunner(ls=graph)
exit_code = runner.run(["ls"])
assert exit_code == CliExitCode.SUCCESS.value
# ---------------------------------------------------------------------- #
# 导出测试
# ---------------------------------------------------------------------- #
class TestCliRunnerExport:
"""测试 CliRunner 从顶层包导出."""
def test_cli_runner_exported_from_pyflowx(self) -> None:
"""CliRunner 应从 pyflowx 顶层导出."""
assert hasattr(px, "CliRunner")
assert px.CliRunner is CliRunner
def test_cli_exit_code_exported_from_pyflowx(self) -> None:
"""CliExitCode 应从 pyflowx 顶层导出."""
assert hasattr(px, "CliExitCode")
assert px.CliExitCode is CliExitCode
def test_cli_runner_in_all(self) -> None:
"""CliRunner 应在 __all__ 中."""
assert "CliRunner" in px.__all__
def test_cli_exit_code_in_all(self) -> None:
"""CliExitCode 应在 __all__ 中."""
assert "CliExitCode" in px.__all__
if __name__ == "__main__":
pytest.main([__file__, "-v"])