refactor: 重构CliRunner,新增cmd工厂函数优化任务定义

1. 新增cmd工厂函数,简化TaskSpec创建并自动推导名称
2. 重构CliRunner,将graphs参数替换为tasks+aliases,支持扁平任务注册与别名映射
3. 替换所有cli工具中的旧版任务定义方式,使用新API简化代码
4. 补充对应测试用例,适配新的运行器API
This commit is contained in:
2026-06-28 17:52:52 +08:00
parent 40f641611b
commit 5e561b4b3a
11 changed files with 493 additions and 219 deletions
+2
View File
@@ -94,6 +94,7 @@ from .task import (
TaskResult, TaskResult,
TaskSpec, TaskSpec,
TaskStatus, TaskStatus,
cmd,
task, task,
task_template, task_template,
) )
@@ -136,6 +137,7 @@ __all__ = [
"TaskStatus", "TaskStatus",
"TaskTimeoutError", "TaskTimeoutError",
"build_call_args", "build_call_args",
"cmd",
"compose", "compose",
"describe_injection", "describe_injection",
"run", "run",
+2 -2
View File
@@ -86,9 +86,9 @@ def main() -> None:
runner = px.CliRunner( runner = px.CliRunner(
strategy="thread", strategy="thread",
description="FolderBack - 文件夹备份工具", description="FolderBack - 文件夹备份工具",
graphs={ aliases={
# 备份当前目录到 ./backup # 备份当前目录到 ./backup
"b": px.Graph.from_specs([folderback_default]), "b": folderback_default,
}, },
) )
runner.run_cli() runner.run_cli()
+2 -2
View File
@@ -74,9 +74,9 @@ def main() -> None:
runner = px.CliRunner( runner = px.CliRunner(
strategy="thread", strategy="thread",
description="FolderZip - 文件夹压缩工具", description="FolderZip - 文件夹压缩工具",
graphs={ aliases={
# 压缩当前目录下的所有文件夹 # 压缩当前目录下的所有文件夹
"z": px.Graph.from_specs([folderzip_default]), "z": folderzip_default,
}, },
) )
runner.run_cli() runner.run_cli()
+5 -5
View File
@@ -67,7 +67,7 @@ def main() -> None:
runner = px.CliRunner( runner = px.CliRunner(
strategy="thread", strategy="thread",
description="Gittool - Git 执行工具.", description="Gittool - Git 执行工具.",
graphs={ aliases={
# 添加并提交 # 添加并提交
"a": px.Graph.from_specs([ "a": px.Graph.from_specs([
px.TaskSpec("add", cmd=["git", "add", "."], conditions=(lambda _: has_files(),)), px.TaskSpec("add", cmd=["git", "add", "."], conditions=(lambda _: has_files(),)),
@@ -90,13 +90,13 @@ def main() -> None:
), ),
]), ]),
# 初始化子目录 # 初始化子目录
"isub": px.Graph.from_specs([isub]), "isub": isub,
# 推送 # 推送
"p": px.Graph.from_specs([push]), "p": push,
# 拉取 # 拉取
"pl": px.Graph.from_specs([pull]), "pl": pull,
# 重启TGit缓存 # 重启TGit缓存
"r": px.Graph.from_specs([kill_tgit]), "r": kill_tgit,
}, },
) )
runner.run_cli() runner.run_cli()
+53 -44
View File
@@ -24,33 +24,41 @@ def maturin_build_cmd() -> list[str]:
return command return command
uv_build: px.TaskSpec = px.TaskSpec("uv_build", cmd=["uv", "build"]) # 扁平注册所有任务(px.cmd 自动从命令前两段推导 name)
maturin_build: px.TaskSpec = px.TaskSpec("maturin_build", cmd=maturin_build_cmd()) tasks: list[px.TaskSpec] = [
uv_sync: px.TaskSpec = px.TaskSpec("uv_sync", cmd=["uv", "sync"]) px.cmd(["uv", "build"]),
git_clean: px.TaskSpec = px.TaskSpec("git_clean", cmd=["gitt", "c"]) px.cmd(maturin_build_cmd(), name="maturin_build"),
test: px.TaskSpec = px.TaskSpec( px.cmd(["uv", "sync"]),
"test", cmd=["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"] px.cmd(["gitt", "c"], name="git_clean"),
) px.cmd(
test_fast: px.TaskSpec = px.TaskSpec( ["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"],
"test_fast", cmd=["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"] name="test",
) ),
test_coverage: px.TaskSpec = px.TaskSpec( px.cmd(
"test_coverage", ["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"],
cmd=["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"], name="test_fast",
) ),
ruff_lint: px.TaskSpec = px.TaskSpec("lint", cmd=["ruff", "check", "--fix", "--unsafe-fixes"]) px.cmd(
typecheck: px.TaskSpec = px.TaskSpec("pyrefly_check", cmd=["pyrefly", "check", "."]) ["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
git_add_all: px.TaskSpec = px.TaskSpec("git_add_all", cmd=["git", "add", "-A"]) name="test_coverage",
bump: px.TaskSpec = px.TaskSpec("bumpversion", cmd=["bumpversion"]) ),
doc: px.TaskSpec = px.TaskSpec("doc", cmd=["sphinx-build", "-b", "html", "docs", "docs/_build"]) px.cmd(["pyrefly", "check", "."], name="pyrefly_check"),
git_push: px.TaskSpec = px.TaskSpec("git_push", cmd=["git", "push"]) px.cmd(["git", "add", "-A"], name="git_add_all"),
git_push_tags: px.TaskSpec = px.TaskSpec("git_push_tags", cmd=["git", "push", "--tags"]) px.cmd(["bumpversion"], name="bumpversion"),
hatch_publish: px.TaskSpec = px.TaskSpec("publish_python", cmd=["hatch", "publish"]) px.cmd(["bumpversion", "minor"], name="bumpversion_minor"),
twine_publish: px.TaskSpec = px.TaskSpec("twine_publish", cmd=["twine", "upload", "--disable-progress-bar"]) px.cmd(["git", "push"], name="git_push"),
tox: px.TaskSpec = px.TaskSpec("tox", cmd=["tox", "-p", "auto"]) px.cmd(["git", "push", "--tags"], name="git_push_tags"),
px.cmd(["hatch", "publish"], name="publish_python"),
px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish"),
]
# 单任务别名(alias 名与任务名相同):直接用 TaskSpec,避免 str 自引用
_doc = px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc")
_lint = px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint")
_tox = px.cmd(["tox", "-p", "auto"], name="tox")
def main(): def main() -> None:
"""pymake 构建工具. """pymake 构建工具.
🔨 构建命令: 🔨 构建命令:
@@ -78,10 +86,10 @@ def main():
📦 发布命令: 📦 发布命令:
pymake pb - 发布到 PyPI (twine + hatch) pymake pb - 发布到 PyPI (twine + hatch)
版本管理: 🔖 版本管理:
pymake bump - 自动升级版本号并提交修改 (清理 + 检查 + 格式化 + git add + bumpversion) pymake bump - 自动升级版本号并提交修改 (清理 + 检查 + 格式化 + git add + bumpversion)
💡 常用工作流: 💡 常用工作流:
1. 日常开发: pymake lint && pymake t 1. 日常开发: pymake lint && pymake t
2. 构建发布包: pymake ba 2. 构建发布包: pymake ba
3. 多版本兼容性测试: pymake tox 3. 多版本兼容性测试: pymake tox
@@ -98,28 +106,29 @@ def main():
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
description="PyMake - Python 构建工具", description="PyMake - Python 构建工具",
graphs={ tasks=tasks,
aliases={
# 构建命令 # 构建命令
"b": px.Graph.from_specs([uv_build]), "b": "uv_build",
"bc": px.Graph.from_specs([maturin_build]), "bc": "maturin_build",
"ba": px.Graph.from_specs(["b", "bc"]), "ba": ["b", "bc"],
# 安装命令 # 安装命令
"sync": px.Graph.from_specs([uv_sync]), "sync": "uv_sync",
# 清理命令 # 清理命令
"c": px.Graph.from_specs([git_clean]), "c": "git_clean",
# 开发工具 # 开发工具
"bump": px.Graph.from_specs(["c", "tc", git_add_all, bump]), "bump": ["c", "tc", "git_add_all", "bumpversion"],
"bumpmi": px.Graph.from_specs([px.TaskSpec("bumpversion_minor", cmd=["bumpversion", "minor"])]), "bumpmi": "bumpversion_minor",
"cov": px.Graph.from_specs([git_clean, test_coverage]), "cov": ["git_clean", "test_coverage"],
"doc": px.Graph.from_specs([doc]), "doc": _doc,
"lint": px.Graph.from_specs([ruff_lint]), "lint": _lint,
"pb": px.Graph.from_specs([twine_publish, hatch_publish]), "pb": ["twine_publish", "publish_python"],
"t": px.Graph.from_specs([test]), "t": "test",
"tf": px.Graph.from_specs([test_fast]), "tf": "test_fast",
"tc": px.Graph.from_specs([typecheck, "lint"]), "tc": ["pyrefly_check", "lint"],
"tox": px.Graph.from_specs([tox]), "tox": _tox,
# 发布命令 # 发布命令
"p": px.Graph.from_specs([git_clean, git_push, git_push_tags]), "p": ["git_clean", "git_push", "git_push_tags"],
}, },
) )
runner.run_cli() runner.run_cli()
+94 -35
View File
@@ -72,67 +72,126 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
class CliRunner: class CliRunner:
"""命令行运行器: 根据用户输入执行对应的任务流图. """命令行运行器: 根据用户输入执行对应的任务流图.
将命令名映射到 Graph 实例. 将命令名映射到 Graph 实例. 通过 ``sys.argv`` 解析用户输入的命令,
通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图. 执行对应的图.
Parameters Parameters
---------- ----------
aliases : dict[str, str | list[str] | Graph]
命令别名到任务引用的映射. 每个值可以是:
* ``str`` —— 单个任务名 (引用 ``tasks`` 中注册的任务),
生成单任务图.
* ``list[str]`` —— 任务名列表, 自动 :meth:`Graph.chain` 建立链式依赖,
即后一个任务依赖前一个.
* :class:`~pyflowx.graph.Graph` —— 直接使用该图 (用于复杂场景, 如
自定义 ``conditions``、并行分支等).
tasks : list[TaskSpec]
扁平注册的任务列表. ``aliases`` 中的字符串引用这些任务名.
未被任何 alias 引用的任务不会被执行.
strategy : str | Strategy strategy : str | Strategy
默认执行策略 (``Strategy.SEQUENTIAL`` / ``Strategy.THREAD`` / 默认执行策略. 可被命令行 ``--strategy`` 覆盖.
``Strategy.ASYNC`` 或对应字符串). 可被命令行 ``--strategy`` 覆盖. description : str
CLI 帮助文本.
verbose : bool verbose : bool
是否显示详细执行过程. ``True`` 时打印任务生命周期和 subprocess 输出. 是否显示详细执行过程. 默认 ``True``, 可被命令行 ``--quiet`` 关闭.
默认 ``True``. 可被命令行 ``--quiet`` 关闭.
**graphs : Graph
命令名到图的映射. 每个 key 是一个命令名, value 是对应的
:class:`~pyflowx.graph.Graph`.
Examples Examples
-------- --------
基本用法:: 简单场景 (tasks + aliases)::
runner = px.CliRunner( runner = px.CliRunner(
clean=px.Graph.from_specs( tasks=[
[ px.cmd(["uv", "build"]), # name="uv_build"
px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]), px.cmd(["maturin", "build"], name="maturin_build"),
] px.cmd(["ruff", "check", "--fix"], name="lint"),
), ],
build=px.Graph.from_specs( aliases={
[ "b": "uv_build",
px.TaskSpec("uv_build", cmd=["uv", "build"]), "ba": ["uv_build", "maturin_build"], # chain: maturin 依赖 uv
] "lint": "lint",
), },
) )
runner.run() # 解析 sys.argv runner.run()
指定策略与描述:: 复杂场景 (直接用 Graph)::
runner = px.CliRunner( runner = px.CliRunner(
strategy=px.Strategy.THREAD, aliases={
"a": px.Graph.from_specs([
px.TaskSpec("add", cmd=["git", "add", "."], conditions=(...)),
px.TaskSpec("commit", cmd=["git", "commit"], depends_on=("add",)),
]),
},
) )
runner.run(["test", "--strategy", "sequential"])
""" """
graphs: dict[str, Graph] = field(default_factory=dict) aliases: dict[str, str | list[str | TaskSpec[Any]] | TaskSpec[Any] | Graph] = field(default_factory=dict)
tasks: list[TaskSpec[Any]] = field(default_factory=list)
strategy: Strategy = field(default="dependency") strategy: Strategy = field(default="dependency")
description: str = field(default_factory=str) description: str = field(default_factory=str)
verbose: bool = field(default_factory=lambda: True) verbose: bool = field(default_factory=lambda: True)
# 解析后的命令→图映射,__post_init__ 填充
graphs: dict[str, Graph] = field(default_factory=dict, init=False)
def __post_init__(self) -> None: def __post_init__(self) -> None:
if not self.graphs: if not self.aliases:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") raise ValueError("CliRunner 至少需要一个别名 (通过 aliases= 提供)")
# 解析并展开字符串引用,委托给 GraphComposer。 # 1. 把 tasks 注册为虚拟命令图(每个 task 一个图),加入 raw_graphs
# Graph 不再 frozen,可直接赋值,无需 object.__setattr__。 # 使 GraphComposer 能解析对它们的字符串引用
self.graphs = GraphComposer(self.graphs).resolve_all() raw_graphs: dict[str, Graph] = {}
for spec in self.tasks:
if spec.name in raw_graphs:
raise ValueError(f"任务名重复: {spec.name!r}")
raw_graphs[spec.name] = Graph.from_specs([spec])
# 2. 把每个 alias 转为 Graphalias 名可与 task 名相同,覆盖 task 注册)
for alias, value in self.aliases.items():
raw_graphs[alias] = self._alias_to_graph(alias, value)
# 3. 解析图间字符串引用(str / list[str] 引用其他 alias 或任务)
self.graphs = GraphComposer(raw_graphs).resolve_all()
@staticmethod
def _alias_to_graph(
alias: str,
value: str | list[str | TaskSpec[Any]] | TaskSpec[Any] | Graph,
) -> Graph:
"""把 alias 的值转换为 Graph.
* ``str`` —— 对其他 alias 或已注册任务名的引用, 由 GraphComposer 展开.
* ``TaskSpec`` —— 单个内联任务, 生成单任务图.
* ``list[str | TaskSpec]`` —— 引用/任务混合列表, GraphComposer 展开时
自动让后续引用依赖前面 (chain 语义). 元素为 alias 名、任务名或
:class:`TaskSpec` 对象 (内联任务).
* ``Graph`` —— 原样返回 (用于复杂场景: conditions、并行分支等).
"""
if isinstance(value, Graph):
return value
if isinstance(value, TaskSpec):
return Graph.from_specs([value])
if isinstance(value, str):
# 字符串引用,用 _pending_refs 占位,GraphComposer 后续展开
return Graph.from_specs([value]) # type: ignore[arg-type]
if isinstance(value, list):
if not value:
raise ValueError(f"别名 {alias!r} 的任务列表为空")
for item in value:
if not isinstance(item, (str, TaskSpec)):
raise TypeError(f"别名 {alias!r} 的列表元素类型无效: {type(item).__name__}, 预期 str 或 TaskSpec")
# str/TaskSpec 混合列表,由 GraphComposer 展开(自动建立 chain 依赖)
return Graph.from_specs(value)
raise TypeError(
f"别名 {alias!r} 的值类型无效: {type(value).__name__}, 预期 str/TaskSpec/list[str|TaskSpec]/Graph"
)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 内省 # 内省
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
@property @property
def commands(self) -> list[str]: def commands(self) -> list[str]:
"""可用的命令列表 (按插入顺序).""" """可用的命令列表 (按 aliases 定义顺序, 不含 tasks 中未引用的任务)."""
return list(self.graphs.keys()) return list(self.aliases.keys())
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 参数解析 # 参数解析
@@ -225,9 +284,9 @@ class CliRunner:
parser.print_help() parser.print_help()
return CliExitCode.FAILURE.value return CliExitCode.FAILURE.value
# 验证命令 # 验证命令(必须是已注册的 alias,不接受裸任务名)
if parsed.command not in self.graphs: if parsed.command not in self.aliases:
available = ", ".join(self.graphs.keys()) available = ", ".join(self.commands)
print( print(
f"错误: 未知命令 {parsed.command!r} (可用命令: {available})", f"错误: 未知命令 {parsed.command!r} (可用命令: {available})",
file=sys.stderr, file=sys.stderr,
+32
View File
@@ -535,6 +535,38 @@ def task(
return _decorate(fn) return _decorate(fn)
def cmd(
command: list[str],
*,
name: str | None = None,
depends_on: tuple[str, ...] = (),
**kwargs: Any,
) -> TaskSpec[Any]:
"""从命令列表快速创建 :class:`TaskSpec`。
``name`` 默认为 ``"_".join(command[:2])``(如 ``["uv", "build"]`` → ``"uv_build"``)。
若命令不足两个元素则用 ``"_".join(command)``。
其余关键字参数透传给 :class:`TaskSpec`(如 ``depends_on``、``tags`` 等)。
Examples
--------
>>> uv_build = px.cmd(["uv", "build"])
>>> uv_build.name
'uv_build'
>>> lint = px.cmd(["ruff", "check", "--fix"], name="lint")
>>> lint.name
'lint'
"""
spec_name = name or "_".join(command[:2]) if len(command) >= 2 else "_".join(command)
return TaskSpec(
name=spec_name,
cmd=command,
depends_on=depends_on,
**kwargs,
)
def task_template( def task_template(
fn: TaskFn[Any] | None = None, fn: TaskFn[Any] | None = None,
cmd: TaskCmd | None = None, cmd: TaskCmd | None = None,
+74 -50
View File
@@ -70,92 +70,116 @@ class TestMaturinBuildCmd:
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
# TaskSpec definitions # TaskSpec definitions
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
def _find_task(name: str) -> pymake.px.TaskSpec:
"""从 pymake.tasks 或单任务别名变量中查找指定名称的 TaskSpec."""
for spec in pymake.tasks:
if spec.name == name:
return spec
# 单任务别名变量(_doc/_lint/_tox
alias_map = {"doc": pymake._doc, "lint": pymake._lint, "tox": pymake._tox}
if name in alias_map:
return alias_map[name]
raise KeyError(f"任务 {name!r} 未找到")
class TestTaskSpecDefinitions: class TestTaskSpecDefinitions:
"""Test that all TaskSpec definitions are valid.""" """Test that all TaskSpec definitions are valid."""
def test_uv_build_spec(self) -> None: def test_uv_build_spec(self) -> None:
"""uv_build spec should be properly defined.""" """uv_build spec should be properly defined."""
assert pymake.uv_build.name == "uv_build" spec = _find_task("uv_build")
assert pymake.uv_build.cmd == ["uv", "build"] assert spec.name == "uv_build"
assert pymake.uv_build.skip_if_missing is False assert spec.cmd == ["uv", "build"]
assert spec.skip_if_missing is False
def test_maturin_build_spec(self) -> None: def test_maturin_build_spec(self) -> None:
"""maturin_build spec should be properly defined.""" """maturin_build spec should be properly defined."""
assert pymake.maturin_build.name == "maturin_build" spec = _find_task("maturin_build")
assert isinstance(pymake.maturin_build.cmd, list) assert spec.name == "maturin_build"
assert pymake.maturin_build.skip_if_missing is False assert isinstance(spec.cmd, list)
assert spec.skip_if_missing is False
def test_uv_sync_spec(self) -> None: def test_uv_sync_spec(self) -> None:
"""uv_sync spec should be properly defined.""" """uv_sync spec should be properly defined."""
assert pymake.uv_sync.name == "uv_sync" spec = _find_task("uv_sync")
assert pymake.uv_sync.cmd == ["uv", "sync"] assert spec.name == "uv_sync"
assert pymake.uv_sync.skip_if_missing is False assert spec.cmd == ["uv", "sync"]
assert spec.skip_if_missing is False
def test_git_clean_spec(self) -> None: def test_git_clean_spec(self) -> None:
"""git_clean spec should be properly defined.""" """git_clean spec should be properly defined."""
assert pymake.git_clean.name == "git_clean" spec = _find_task("git_clean")
assert pymake.git_clean.cmd == ["gitt", "c"] assert spec.name == "git_clean"
assert pymake.git_clean.skip_if_missing is False assert spec.cmd == ["gitt", "c"]
assert spec.skip_if_missing is False
def test_test_spec(self) -> None: def test_test_spec(self) -> None:
"""test spec should be properly defined.""" """test spec should be properly defined."""
assert pymake.test.name == "test" spec = _find_task("test")
assert isinstance(pymake.test.cmd, list) assert spec.name == "test"
assert "pytest" in pymake.test.cmd assert isinstance(spec.cmd, list)
assert "-m" in pymake.test.cmd assert "pytest" in spec.cmd
assert "not slow" in pymake.test.cmd assert "-m" in spec.cmd
assert pymake.test.skip_if_missing is False assert "not slow" in spec.cmd
assert spec.skip_if_missing is False
def test_test_fast_spec(self) -> None: def test_test_fast_spec(self) -> None:
"""test_fast spec should be properly defined.""" """test_fast spec should be properly defined."""
assert pymake.test_fast.name == "test_fast" spec = _find_task("test_fast")
assert isinstance(pymake.test_fast.cmd, list) assert spec.name == "test_fast"
assert "pytest" in pymake.test_fast.cmd assert isinstance(spec.cmd, list)
assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel assert "pytest" in spec.cmd
assert pymake.test_fast.skip_if_missing is False assert "-n" not in spec.cmd # test_fast doesn't use parallel
assert spec.skip_if_missing is False
def test_test_coverage_spec(self) -> None: def test_test_coverage_spec(self) -> None:
"""test_coverage spec should be properly defined.""" """test_coverage spec should be properly defined."""
assert pymake.test_coverage.name == "test_coverage" spec = _find_task("test_coverage")
assert isinstance(pymake.test_coverage.cmd, list) assert spec.name == "test_coverage"
assert "pytest" in pymake.test_coverage.cmd assert isinstance(spec.cmd, list)
assert "--cov" in pymake.test_coverage.cmd assert "pytest" in spec.cmd
assert pymake.test_coverage.skip_if_missing is False assert "--cov" in spec.cmd
assert spec.skip_if_missing is False
def test_ruff_lint_spec(self) -> None: def test_ruff_lint_spec(self) -> None:
"""ruff_lint spec should be properly defined.""" """lint spec should be properly defined."""
assert pymake.ruff_lint.name == "lint" spec = _find_task("lint")
assert isinstance(pymake.ruff_lint.cmd, list) assert spec.name == "lint"
assert "ruff" in pymake.ruff_lint.cmd assert isinstance(spec.cmd, list)
assert "check" in pymake.ruff_lint.cmd assert "ruff" in spec.cmd
assert pymake.ruff_lint.skip_if_missing is False assert "check" in spec.cmd
assert spec.skip_if_missing is False
def test_doc_spec(self) -> None: def test_doc_spec(self) -> None:
"""doc spec should be properly defined.""" """doc spec should be properly defined."""
assert pymake.doc.name == "doc" spec = _find_task("doc")
assert isinstance(pymake.doc.cmd, list) assert spec.name == "doc"
assert "sphinx-build" in pymake.doc.cmd assert isinstance(spec.cmd, list)
assert pymake.doc.skip_if_missing is False assert "sphinx-build" in spec.cmd
assert spec.skip_if_missing is False
def test_hatch_publish_spec(self) -> None: def test_hatch_publish_spec(self) -> None:
"""hatch_publish spec should be properly defined.""" """publish_python spec should be properly defined."""
assert pymake.hatch_publish.name == "publish_python" spec = _find_task("publish_python")
assert pymake.hatch_publish.cmd == ["hatch", "publish"] assert spec.name == "publish_python"
assert pymake.hatch_publish.skip_if_missing is False assert spec.cmd == ["hatch", "publish"]
assert spec.skip_if_missing is False
def test_twine_publish_spec(self) -> None: def test_twine_publish_spec(self) -> None:
"""twine_publish spec should be properly defined.""" """twine_publish spec should be properly defined."""
assert pymake.twine_publish.name == "twine_publish" spec = _find_task("twine_publish")
assert isinstance(pymake.twine_publish.cmd, list) assert spec.name == "twine_publish"
assert "twine" in pymake.twine_publish.cmd assert isinstance(spec.cmd, list)
assert "upload" in pymake.twine_publish.cmd assert "twine" in spec.cmd
assert pymake.twine_publish.skip_if_missing is False assert "upload" in spec.cmd
assert spec.skip_if_missing is False
def test_tox_spec(self) -> None: def test_tox_spec(self) -> None:
"""tox spec should be properly defined.""" """tox spec should be properly defined."""
assert pymake.tox.name == "tox" spec = _find_task("tox")
assert pymake.tox.cmd == ["tox", "-p", "auto"] assert spec.name == "tox"
assert pymake.tox.skip_if_missing is False assert spec.cmd == ["tox", "-p", "auto"]
assert spec.skip_if_missing is False
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
+19 -19
View File
@@ -17,7 +17,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"build": px.Graph.from_specs([build_task]), "build": px.Graph.from_specs([build_task]),
"test": px.Graph.from_specs([test_task]), "test": px.Graph.from_specs([test_task]),
"all": px.Graph.from_specs([build_task, "test"]), "all": px.Graph.from_specs([build_task, "test"]),
@@ -38,7 +38,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]), "cmd2": px.Graph.from_specs([task2]),
"cmd3": px.Graph.from_specs([task3]), "cmd3": px.Graph.from_specs([task3]),
@@ -57,7 +57,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"lint": px.Graph.from_specs([lint_task, format_task]), "lint": px.Graph.from_specs([lint_task, format_task]),
"quick": px.Graph.from_specs(["lint.lint"]), "quick": px.Graph.from_specs(["lint.lint"]),
}, },
@@ -75,7 +75,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1", task2]), "cmd2": px.Graph.from_specs(["cmd1", task2]),
"cmd3": px.Graph.from_specs(["cmd2", task3]), "cmd3": px.Graph.from_specs(["cmd2", task3]),
@@ -93,7 +93,7 @@ class TestCommandReferences:
with pytest.raises(ValueError, match="循环引用"): with pytest.raises(ValueError, match="循环引用"):
px.CliRunner( px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs(["cmd1", task1]), "cmd1": px.Graph.from_specs(["cmd1", task1]),
}, },
) )
@@ -105,7 +105,7 @@ class TestCommandReferences:
with pytest.raises(ValueError, match="引用的命令 'invalid' 不存在"): with pytest.raises(ValueError, match="引用的命令 'invalid' 不存在"):
px.CliRunner( px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs(["invalid", task1]), "cmd1": px.Graph.from_specs(["invalid", task1]),
}, },
) )
@@ -117,7 +117,7 @@ class TestCommandReferences:
with pytest.raises(ValueError, match="任务 'invalid' 不存在于命令 'cmd1'"): with pytest.raises(ValueError, match="任务 'invalid' 不存在于命令 'cmd1'"):
px.CliRunner( px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1.invalid"]), "cmd2": px.Graph.from_specs(["cmd1.invalid"]),
}, },
@@ -130,7 +130,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1, task2]), "cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1"]), "cmd2": px.Graph.from_specs(["cmd1"]),
}, },
@@ -148,7 +148,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1, task2]), "cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1", task3]), "cmd2": px.Graph.from_specs(["cmd1", task3]),
}, },
@@ -168,7 +168,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2, task3]), "cmd2": px.Graph.from_specs([task2, task3]),
"cmd3": px.Graph.from_specs([task4]), "cmd3": px.Graph.from_specs([task4]),
@@ -205,7 +205,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]), "cmd2": px.Graph.from_specs([task2]),
"all": px.Graph.from_specs(["cmd1", "cmd2", task3, task4, task5]), "all": px.Graph.from_specs(["cmd1", "cmd2", task3, task4, task5]),
@@ -242,7 +242,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1, task2]), "cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs([task3]), "cmd2": px.Graph.from_specs([task3]),
"all": px.Graph.from_specs(["cmd1", "cmd2", task4]), "all": px.Graph.from_specs(["cmd1", "cmd2", task4]),
@@ -279,7 +279,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"c": px.Graph.from_specs([git_clean]), "c": px.Graph.from_specs([git_clean]),
"tc": px.Graph.from_specs([typecheck, "lint"]), "tc": px.Graph.from_specs([typecheck, "lint"]),
"lint": px.Graph.from_specs([lint, format_task]), "lint": px.Graph.from_specs([lint, format_task]),
@@ -319,7 +319,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]), "cmd2": px.Graph.from_specs([task2]),
"cmd3": px.Graph.from_specs([task3]), "cmd3": px.Graph.from_specs([task3]),
@@ -350,7 +350,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"all": px.Graph.from_specs([task1, task2, task3]), "all": px.Graph.from_specs([task1, task2, task3]),
}, },
) )
@@ -373,7 +373,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1, task2]), "cmd1": px.Graph.from_specs([task1, task2]),
"all": px.Graph.from_specs(["cmd1"]), "all": px.Graph.from_specs(["cmd1"]),
}, },
@@ -399,7 +399,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1]), "cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1", task2]), "cmd2": px.Graph.from_specs(["cmd1", task2]),
"cmd3": px.Graph.from_specs(["cmd2", task3]), "cmd3": px.Graph.from_specs(["cmd2", task3]),
@@ -430,7 +430,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"cmd1": px.Graph.from_specs([task1, task2]), # Parallel tasks "cmd1": px.Graph.from_specs([task1, task2]), # Parallel tasks
"cmd2": px.Graph.from_specs([task3, task4]), # Parallel tasks "cmd2": px.Graph.from_specs([task3, task4]), # Parallel tasks
"all": px.Graph.from_specs(["cmd1", "cmd2"]), "all": px.Graph.from_specs(["cmd1", "cmd2"]),
@@ -465,7 +465,7 @@ class TestCommandReferences:
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
graphs={ aliases={
"clean": px.Graph.from_specs([clean]), "clean": px.Graph.from_specs([clean]),
"build": px.Graph.from_specs([build1, build2]), "build": px.Graph.from_specs([build1, build2]),
"test": px.Graph.from_specs([test1, test2]), "test": px.Graph.from_specs([test1, test2]),
+174 -62
View File
@@ -53,18 +53,18 @@ class TestCliRunnerConstruction:
def test_requires_at_least_one_command(self) -> None: def test_requires_at_least_one_command(self) -> None:
"""没有命令时应抛出 ValueError.""" """没有命令时应抛出 ValueError."""
with pytest.raises(ValueError, match="至少需要一个命令"): with pytest.raises(ValueError, match="至少需要一个别名"):
_ = px.CliRunner() _ = px.CliRunner()
def test_accepts_single_graph(self) -> None: def test_accepts_single_graph(self) -> None:
"""单个命令应正常构造.""" """单个命令应正常构造."""
runner = px.CliRunner(graphs={"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
assert runner.commands == ["clean"] assert runner.commands == ["clean"]
def test_accepts_multiple_graphs(self) -> None: def test_accepts_multiple_graphs(self) -> None:
"""多个命令应按插入顺序保留.""" """多个命令应按插入顺序保留."""
runner = px.CliRunner( runner = px.CliRunner(
graphs={ aliases={
"clean": _echo_graph("c", "clean"), "clean": _echo_graph("c", "clean"),
"build": _echo_graph("b", "build"), "build": _echo_graph("b", "build"),
"test": _echo_graph("t", "test"), "test": _echo_graph("t", "test"),
@@ -74,37 +74,37 @@ class TestCliRunnerConstruction:
def test_default_strategy_is_dependency(self) -> None: def test_default_strategy_is_dependency(self) -> None:
"""默认策略应为 dependency(依赖驱动,最大并行度).""" """默认策略应为 dependency(依赖驱动,最大并行度)."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
assert runner.strategy == "dependency" assert runner.strategy == "dependency"
def test_custom_strategy_string(self) -> None: def test_custom_strategy_string(self) -> None:
"""应支持通过字符串指定策略.""" """应支持通过字符串指定策略."""
runner = px.CliRunner({"clean": _echo_graph()}, strategy="thread") runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="thread")
assert runner.strategy == "thread" assert runner.strategy == "thread"
def test_custom_strategy_enum(self) -> None: def test_custom_strategy_enum(self) -> None:
"""应支持通过 Strategy 枚举指定策略.""" """应支持通过 Strategy 枚举指定策略."""
runner = px.CliRunner({"clean": _echo_graph()}, strategy="async") runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="async")
assert runner.strategy == "async" assert runner.strategy == "async"
def test_default_verbose_is_true(self) -> None: def test_default_verbose_is_true(self) -> None:
"""默认 verbose 应为 True.""" """默认 verbose 应为 True."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
assert runner.verbose is True assert runner.verbose is True
def test_custom_verbose_false(self) -> None: def test_custom_verbose_false(self) -> None:
"""应支持关闭 verbose.""" """应支持关闭 verbose."""
runner = px.CliRunner({"clean": _echo_graph()}, verbose=False) runner = px.CliRunner(aliases={"clean": _echo_graph()}, verbose=False)
assert runner.verbose is False assert runner.verbose is False
def test_default_description_is_empty(self) -> None: def test_default_description_is_empty(self) -> None:
"""默认描述应为空字符串.""" """默认描述应为空字符串."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
assert runner.description == "" assert runner.description == ""
def test_custom_description(self) -> None: def test_custom_description(self) -> None:
"""应支持自定义描述.""" """应支持自定义描述."""
runner = px.CliRunner({"clean": _echo_graph()}, description="My CLI") runner = px.CliRunner(aliases={"clean": _echo_graph()}, description="My CLI")
assert runner.description == "My CLI" assert runner.description == "My CLI"
@@ -116,13 +116,13 @@ class TestCliRunnerProperties:
def test_commands_returns_list(self) -> None: def test_commands_returns_list(self) -> None:
"""commands 应返回列表.""" """commands 应返回列表."""
runner = px.CliRunner({"a": _echo_graph(), "b": _echo_graph()}) runner = px.CliRunner(aliases={"a": _echo_graph(), "b": _echo_graph()})
assert isinstance(runner.commands, list) assert isinstance(runner.commands, list)
def test_graphs_contains_original_graphs(self) -> None: def test_graphs_contains_original_graphs(self) -> None:
"""graphs 应包含原始 Graph 实例.""" """graphs 应包含原始 Graph 实例."""
g = _echo_graph() g = _echo_graph()
runner = px.CliRunner({"cmd": g}) runner = px.CliRunner(aliases={"cmd": g})
assert runner.graphs["cmd"] is g assert runner.graphs["cmd"] is g
@@ -136,69 +136,69 @@ class TestCliRunnerParser:
"""create_parser 应返回 ArgumentParser.""" """create_parser 应返回 ArgumentParser."""
from argparse import ArgumentParser from argparse import ArgumentParser
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
assert isinstance(parser, ArgumentParser) assert isinstance(parser, ArgumentParser)
def test_parser_has_command_argument(self) -> None: def test_parser_has_command_argument(self) -> None:
"""解析器应有 command 位置参数.""" """解析器应有 command 位置参数."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean"]) parsed = parser.parse_args(["clean"])
assert parsed.command == "clean" assert parsed.command == "clean"
def test_parser_command_is_optional(self) -> None: def test_parser_command_is_optional(self) -> None:
"""command 应为可选参数.""" """command 应为可选参数."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args([]) parsed = parser.parse_args([])
assert parsed.command is None assert parsed.command is None
def test_parser_has_strategy_option(self) -> None: def test_parser_has_strategy_option(self) -> None:
"""解析器应有 --strategy 选项.""" """解析器应有 --strategy 选项."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean", "--strategy", "thread"]) parsed = parser.parse_args(["clean", "--strategy", "thread"])
assert parsed.strategy == "thread" assert parsed.strategy == "thread"
def test_parser_strategy_default(self) -> None: def test_parser_strategy_default(self) -> None:
"""--strategy 默认值应与构造时一致.""" """--strategy 默认值应与构造时一致."""
runner = px.CliRunner({"clean": _echo_graph()}, strategy="async") runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="async")
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean"]) parsed = parser.parse_args(["clean"])
assert parsed.strategy == "async" assert parsed.strategy == "async"
def test_parser_has_dry_run_flag(self) -> None: def test_parser_has_dry_run_flag(self) -> None:
"""解析器应有 --dry-run 标志.""" """解析器应有 --dry-run 标志."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean", "--dry-run"]) parsed = parser.parse_args(["clean", "--dry-run"])
assert parsed.dry_run is True assert parsed.dry_run is True
def test_parser_dry_run_default_false(self) -> None: def test_parser_dry_run_default_false(self) -> None:
"""--dry-run 默认为 False.""" """--dry-run 默认为 False."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean"]) parsed = parser.parse_args(["clean"])
assert parsed.dry_run is False assert parsed.dry_run is False
def test_parser_has_list_flag(self) -> None: def test_parser_has_list_flag(self) -> None:
"""解析器应有 --list 标志.""" """解析器应有 --list 标志."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["--list"]) parsed = parser.parse_args(["--list"])
assert parsed.list is True assert parsed.list is True
def test_parser_has_quiet_flag(self) -> None: def test_parser_has_quiet_flag(self) -> None:
"""解析器应有 --quiet 标志.""" """解析器应有 --quiet 标志."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean", "--quiet"]) parsed = parser.parse_args(["clean", "--quiet"])
assert parsed.quiet is True assert parsed.quiet is True
def test_parser_quiet_default_false(self) -> None: def test_parser_quiet_default_false(self) -> None:
"""--quiet 默认为 False.""" """--quiet 默认为 False."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
parser = runner.create_parser() parser = runner.create_parser()
parsed = parser.parse_args(["clean"]) parsed = parser.parse_args(["clean"])
assert parsed.quiet is False assert parsed.quiet is False
@@ -222,7 +222,7 @@ class TestCliRunnerRunSuccess:
def test_run_valid_command_returns_zero(self) -> None: def test_run_valid_command_returns_zero(self) -> None:
"""有效命令执行成功应返回 0.""" """有效命令执行成功应返回 0."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
exit_code = runner.run(["clean"]) exit_code = runner.run(["clean"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
@@ -236,28 +236,30 @@ class TestCliRunnerRunSuccess:
def track_b() -> None: def track_b() -> None:
executed.append("b") executed.append("b")
runner = px.CliRunner({ runner = px.CliRunner(
"a": px.Graph.from_specs([px.TaskSpec("a", track_a)]), aliases={
"b": px.Graph.from_specs([px.TaskSpec("b", track_b)]), "a": px.Graph.from_specs([px.TaskSpec("a", track_a)]),
}) "b": px.Graph.from_specs([px.TaskSpec("b", track_b)]),
}
)
_ = runner.run(["b"]) _ = runner.run(["b"])
assert executed == ["b"] assert executed == ["b"]
def test_run_multi_task_graph(self) -> None: def test_run_multi_task_graph(self) -> None:
"""应能执行带依赖的多任务图.""" """应能执行带依赖的多任务图."""
runner = px.CliRunner({"multi": _multi_task_graph()}) runner = px.CliRunner(aliases={"multi": _multi_task_graph()})
exit_code = runner.run(["multi"]) exit_code = runner.run(["multi"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
def test_run_with_strategy_override(self) -> None: def test_run_with_strategy_override(self) -> None:
"""应支持通过 --strategy 覆盖默认策略.""" """应支持通过 --strategy 覆盖默认策略."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
exit_code = runner.run(["echo", "--strategy", "thread"]) exit_code = runner.run(["echo", "--strategy", "thread"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
def test_run_with_dry_run(self, capsys: pytest.CaptureFixture[str]) -> None: def test_run_with_dry_run(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--dry-run 应只打印计划不执行.""" """--dry-run 应只打印计划不执行."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
exit_code = runner.run(["echo", "--dry-run"]) exit_code = runner.run(["echo", "--dry-run"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
captured = capsys.readouterr() captured = capsys.readouterr()
@@ -272,7 +274,7 @@ class TestCliRunnerVerbose:
def test_verbose_default_prints_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_default_prints_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
"""默认 verbose=True 应打印任务生命周期.""" """默认 verbose=True 应打印任务生命周期."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
_ = runner.run(["echo"]) _ = runner.run(["echo"])
captured = capsys.readouterr() captured = capsys.readouterr()
# verbose 模式下应打印任务生命周期 # verbose 模式下应打印任务生命周期
@@ -280,7 +282,7 @@ class TestCliRunnerVerbose:
def test_quiet_flag_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None: def test_quiet_flag_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--quiet 应关闭 verbose 输出.""" """--quiet 应关闭 verbose 输出."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
_ = runner.run(["echo", "--quiet"]) _ = runner.run(["echo", "--quiet"])
captured = capsys.readouterr() captured = capsys.readouterr()
# quiet 模式下不应有 [verbose] 前缀的输出 # quiet 模式下不应有 [verbose] 前缀的输出
@@ -288,14 +290,14 @@ class TestCliRunnerVerbose:
def test_verbose_false_constructor_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_false_constructor_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None:
"""构造时 verbose=False 应关闭 verbose 输出.""" """构造时 verbose=False 应关闭 verbose 输出."""
runner = px.CliRunner({"echo": _echo_graph()}, verbose=False) runner = px.CliRunner(aliases={"echo": _echo_graph()}, verbose=False)
_ = runner.run(["echo"]) _ = runner.run(["echo"])
captured = capsys.readouterr() captured = capsys.readouterr()
assert "[verbose]" not in captured.out assert "[verbose]" not in captured.out
def test_verbose_prints_command_for_cmd_task(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_prints_command_for_cmd_task(self, capsys: pytest.CaptureFixture[str]) -> None:
"""verbose 模式下 cmd 任务应打印执行的命令.""" """verbose 模式下 cmd 任务应打印执行的命令."""
runner = px.CliRunner({"echo": _echo_graph(msg="verbose-test")}) runner = px.CliRunner(aliases={"echo": _echo_graph(msg="verbose-test")})
_ = runner.run(["echo"]) _ = runner.run(["echo"])
captured = capsys.readouterr() captured = capsys.readouterr()
# 应打印执行的命令 # 应打印执行的命令
@@ -305,7 +307,7 @@ class TestCliRunnerVerbose:
def test_verbose_prints_success_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_prints_success_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
"""verbose 模式下成功任务应打印成功信息.""" """verbose 模式下成功任务应打印成功信息."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
_ = runner.run(["echo"]) _ = runner.run(["echo"])
captured = capsys.readouterr() captured = capsys.readouterr()
assert "成功" in captured.out assert "成功" in captured.out
@@ -319,14 +321,14 @@ class TestCliRunnerVerbose:
conditions=(lambda _ctx: False,), conditions=(lambda _ctx: False,),
), ),
]) ])
runner = px.CliRunner({"skip": graph}) runner = px.CliRunner(aliases={"skip": graph})
_ = runner.run(["skip"]) _ = runner.run(["skip"])
captured = capsys.readouterr() captured = capsys.readouterr()
assert "跳过" in captured.out assert "跳过" in captured.out
def test_verbose_prints_failure_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_prints_failure_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
"""verbose 模式下失败任务应打印失败信息.""" """verbose 模式下失败任务应打印失败信息."""
runner = px.CliRunner({"fail": _failing_graph()}) runner = px.CliRunner(aliases={"fail": _failing_graph()})
_ = runner.run(["fail"]) _ = runner.run(["fail"])
captured = capsys.readouterr() captured = capsys.readouterr()
# 失败信息可能出现在 stdout (verbose) 或 stderr (PyFlowXError) # 失败信息可能出现在 stdout (verbose) 或 stderr (PyFlowXError)
@@ -342,7 +344,7 @@ class TestCliRunnerRunFailure:
def test_run_unknown_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: def test_run_unknown_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
"""未知命令应返回 1 并打印错误.""" """未知命令应返回 1 并打印错误."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
exit_code = runner.run(["unknown"]) exit_code = runner.run(["unknown"])
assert exit_code == CliExitCode.FAILURE.value assert exit_code == CliExitCode.FAILURE.value
captured = capsys.readouterr() captured = capsys.readouterr()
@@ -351,7 +353,7 @@ class TestCliRunnerRunFailure:
def test_run_no_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: def test_run_no_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
"""无命令时应返回 1 并打印帮助.""" """无命令时应返回 1 并打印帮助."""
runner = px.CliRunner({"clean": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph()})
exit_code = runner.run([]) exit_code = runner.run([])
assert exit_code == CliExitCode.FAILURE.value assert exit_code == CliExitCode.FAILURE.value
captured = capsys.readouterr() captured = capsys.readouterr()
@@ -359,13 +361,13 @@ class TestCliRunnerRunFailure:
def test_run_failing_task_returns_failure(self) -> None: def test_run_failing_task_returns_failure(self) -> None:
"""任务失败时应返回 1.""" """任务失败时应返回 1."""
runner = px.CliRunner({"fail": _failing_graph()}) runner = px.CliRunner(aliases={"fail": _failing_graph()})
exit_code = runner.run(["fail"]) exit_code = runner.run(["fail"])
assert exit_code == CliExitCode.FAILURE.value assert exit_code == CliExitCode.FAILURE.value
def test_run_failing_task_prints_error(self, capsys: pytest.CaptureFixture[str]) -> None: def test_run_failing_task_prints_error(self, capsys: pytest.CaptureFixture[str]) -> None:
"""任务失败时应打印错误信息.""" """任务失败时应打印错误信息."""
runner = px.CliRunner({"fail": _failing_graph()}) runner = px.CliRunner(aliases={"fail": _failing_graph()})
_ = runner.run(["fail"]) _ = runner.run(["fail"])
captured = capsys.readouterr() captured = capsys.readouterr()
# PyFlowXError 信息应输出到 stderr # PyFlowXError 信息应输出到 stderr
@@ -380,17 +382,19 @@ class TestCliRunnerList:
def test_list_returns_success(self) -> None: def test_list_returns_success(self) -> None:
"""--list 应返回 0.""" """--list 应返回 0."""
runner = px.CliRunner({"clean": _echo_graph(), "build": _echo_graph()}) runner = px.CliRunner(aliases={"clean": _echo_graph(), "build": _echo_graph()})
exit_code = runner.run(["--list"]) exit_code = runner.run(["--list"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
def test_list_prints_all_commands(self, capsys: pytest.CaptureFixture[str]) -> None: def test_list_prints_all_commands(self, capsys: pytest.CaptureFixture[str]) -> None:
"""--list 应打印所有命令.""" """--list 应打印所有命令."""
runner = px.CliRunner({ runner = px.CliRunner(
"clean": _echo_graph("c", "clean"), aliases={
"build": _echo_graph("b", "build"), "clean": _echo_graph("c", "clean"),
"test": _echo_graph("t", "test"), "build": _echo_graph("b", "build"),
}) "test": _echo_graph("t", "test"),
}
)
_ = runner.run(["--list"]) _ = runner.run(["--list"])
captured = capsys.readouterr() captured = capsys.readouterr()
assert "clean" in captured.out assert "clean" in captured.out
@@ -404,7 +408,7 @@ class TestCliRunnerList:
def track() -> None: def track() -> None:
executed.append("ran") executed.append("ran")
runner = px.CliRunner({"a": px.Graph.from_specs([px.TaskSpec("a", track)])}) runner = px.CliRunner(aliases={"a": px.Graph.from_specs([px.TaskSpec("a", track)])})
_ = runner.run(["--list"]) _ = runner.run(["--list"])
assert executed == [] assert executed == []
@@ -417,7 +421,7 @@ class TestCliRunnerErrorHandling:
def test_keyboard_interrupt_returns_130(self, capsys: pytest.CaptureFixture[str]) -> None: def test_keyboard_interrupt_returns_130(self, capsys: pytest.CaptureFixture[str]) -> None:
"""KeyboardInterrupt 应返回 130.""" """KeyboardInterrupt 应返回 130."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
def raise_interrupt(*_args: Any, **_kwargs: Any) -> None: def raise_interrupt(*_args: Any, **_kwargs: Any) -> None:
raise KeyboardInterrupt raise KeyboardInterrupt
@@ -430,7 +434,7 @@ class TestCliRunnerErrorHandling:
def test_pyflowx_error_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: def test_pyflowx_error_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
"""PyFlowXError 应返回 1.""" """PyFlowXError 应返回 1."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
def raise_error(*_args: Any, **_kwargs: Any) -> None: def raise_error(*_args: Any, **_kwargs: Any) -> None:
raise TaskFailedError("echo", RuntimeError("boom"), 1) raise TaskFailedError("echo", RuntimeError("boom"), 1)
@@ -447,7 +451,7 @@ class TestCliRunnerErrorHandling:
class CustomError(Exception): class CustomError(Exception):
pass pass
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
def raise_custom(*_args: Any, **_kwargs: Any) -> None: def raise_custom(*_args: Any, **_kwargs: Any) -> None:
raise CustomError("unexpected") raise CustomError("unexpected")
@@ -464,14 +468,14 @@ class TestCliRunnerRunCli:
def test_run_cli_calls_sys_exit(self) -> None: def test_run_cli_calls_sys_exit(self) -> None:
"""run_cli 应调用 sys.exit.""" """run_cli 应调用 sys.exit."""
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
with pytest.raises(SystemExit) as exc_info: with pytest.raises(SystemExit) as exc_info:
runner.run_cli(["echo"]) runner.run_cli(["echo"])
assert exc_info.value.code == CliExitCode.SUCCESS.value assert exc_info.value.code == CliExitCode.SUCCESS.value
def test_run_cli_exit_code_on_failure(self) -> None: def test_run_cli_exit_code_on_failure(self) -> None:
"""run_cli 失败时应以非零码退出.""" """run_cli 失败时应以非零码退出."""
runner = px.CliRunner({"fail": _failing_graph()}) runner = px.CliRunner(aliases={"fail": _failing_graph()})
with pytest.raises(SystemExit) as exc_info: with pytest.raises(SystemExit) as exc_info:
runner.run_cli(["fail"]) runner.run_cli(["fail"])
assert exc_info.value.code == CliExitCode.FAILURE.value assert exc_info.value.code == CliExitCode.FAILURE.value
@@ -479,7 +483,7 @@ class TestCliRunnerRunCli:
def test_run_cli_no_args_uses_sys_argv(self, monkeypatch: pytest.MonkeyPatch) -> None: def test_run_cli_no_args_uses_sys_argv(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""run_cli 无参数时应使用 sys.argv.""" """run_cli 无参数时应使用 sys.argv."""
monkeypatch.setattr(sys, "argv", ["pymake", "echo"]) monkeypatch.setattr(sys, "argv", ["pymake", "echo"])
runner = px.CliRunner({"echo": _echo_graph()}) runner = px.CliRunner(aliases={"echo": _echo_graph()})
with pytest.raises(SystemExit) as exc_info: with pytest.raises(SystemExit) as exc_info:
runner.run_cli() runner.run_cli()
assert exc_info.value.code == CliExitCode.SUCCESS.value assert exc_info.value.code == CliExitCode.SUCCESS.value
@@ -520,7 +524,7 @@ class TestCliRunnerIntegration:
conditions=(lambda _ctx: False,), conditions=(lambda _ctx: False,),
), ),
]) ])
runner = px.CliRunner({"skip": graph}) runner = px.CliRunner(aliases={"skip": graph})
exit_code = runner.run(["skip"]) exit_code = runner.run(["skip"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
@@ -533,7 +537,7 @@ class TestCliRunnerIntegration:
conditions=(lambda _ctx: True,), conditions=(lambda _ctx: True,),
), ),
]) ])
runner = px.CliRunner({"run": graph}) runner = px.CliRunner(aliases={"run": graph})
exit_code = runner.run(["run"]) exit_code = runner.run(["run"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
@@ -554,17 +558,19 @@ class TestCliRunnerIntegration:
px.TaskSpec("c", make("c"), depends_on=("a",)), px.TaskSpec("c", make("c"), depends_on=("a",)),
px.TaskSpec("d", make("d"), depends_on=("b", "c")), px.TaskSpec("d", make("d"), depends_on=("b", "c")),
]) ])
runner = px.CliRunner({"diamond": graph}) runner = px.CliRunner(aliases={"diamond": graph})
exit_code = runner.run(["diamond"]) exit_code = runner.run(["diamond"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
assert order == ["a", "b", "c", "d"] assert order == ["a", "b", "c", "d"]
def test_mixed_fn_and_cmd_commands(self) -> None: def test_mixed_fn_and_cmd_commands(self) -> None:
"""混合 fn 和 cmd 的命令应都能执行.""" """混合 fn 和 cmd 的命令应都能执行."""
runner = px.CliRunner({ runner = px.CliRunner(
"fn_cmd": px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]), aliases={
"cmd_cmd": px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]), "fn_cmd": px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]),
}) "cmd_cmd": px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]),
}
)
assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value
assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value
@@ -580,7 +586,7 @@ class TestCliRunnerIntegration:
ls_cmd = ["ls"] ls_cmd = ["ls"]
graph = px.Graph.from_specs([px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))]) graph = px.Graph.from_specs([px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))])
runner = px.CliRunner({"ls": graph}) runner = px.CliRunner(aliases={"ls": graph})
exit_code = runner.run(["ls"]) exit_code = runner.run(["ls"])
assert exit_code == CliExitCode.SUCCESS.value assert exit_code == CliExitCode.SUCCESS.value
@@ -612,3 +618,109 @@ class TestApplyVerboseToGraph:
new_graph = _apply_verbose_to_graph(graph, verbose=True) new_graph = _apply_verbose_to_graph(graph, verbose=True)
new_spec = new_graph.spec("a") new_spec = new_graph.spec("a")
assert new_spec.verbose is True assert new_spec.verbose is True
# ---------------------------------------------------------------------- #
# 新 API: tasks + aliases
# ---------------------------------------------------------------------- #
class TestCliRunnerNewApi:
"""测试 CliRunner 的 tasks + aliases 新 API."""
def test_tasks_plus_aliases_single_str(self) -> None:
"""tasks 注册 + aliases str 引用单任务."""
runner = px.CliRunner(
tasks=[px.cmd([*ECHO_CMD, "a"], name="task_a")],
aliases={"a": "task_a"},
)
assert runner.commands == ["a"]
assert runner.run(["a"]) == CliExitCode.SUCCESS.value
def test_aliases_list_str_builds_chain(self) -> None:
"""aliases list[str] 应建立 chain 依赖(后一个依赖前一个)."""
runner = px.CliRunner(
tasks=[
px.cmd([*ECHO_CMD, "a"], name="task_a"),
px.cmd([*ECHO_CMD, "b"], name="task_b"),
],
aliases={"ab": ["task_a", "task_b"]},
)
graph = runner.graphs["ab"]
specs = graph.all_specs()
assert specs["task_b"].depends_on == ("task_a",)
def test_aliases_taskspec_value(self) -> None:
"""aliases 值为 TaskSpec 时直接生成单任务图."""
spec = px.cmd([*ECHO_CMD, "x"], name="inline_x")
runner = px.CliRunner(aliases={"x": spec})
assert runner.run(["x"]) == CliExitCode.SUCCESS.value
def test_aliases_graph_value(self) -> None:
"""aliases 值为 Graph 时原样使用(复杂场景:conditions 等)."""
graph = px.Graph.from_specs([
px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]),
px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)),
])
runner = px.CliRunner(aliases={"g": graph})
assert set(runner.graphs["g"].all_specs().keys()) == {"a", "b"}
def test_alias_name_same_as_task_name_via_taskspec(self) -> None:
"""alias 名与 task 名相同时,用 TaskSpec 避免自引用循环."""
spec = px.cmd([*ECHO_CMD, "same"], name="same")
runner = px.CliRunner(aliases={"same": spec})
assert runner.run(["same"]) == CliExitCode.SUCCESS.value
def test_alias_str_reference_to_other_alias(self) -> None:
"""alias 值为 str 引用其他 alias."""
runner = px.CliRunner(
aliases={
"base": px.cmd([*ECHO_CMD, "base"], name="base"),
"wrapper": "base",
},
)
assert runner.run(["wrapper"]) == CliExitCode.SUCCESS.value
def test_empty_aliases_raises(self) -> None:
"""空 aliases 应抛 ValueError."""
with pytest.raises(ValueError, match="至少需要一个别名"):
_ = px.CliRunner()
def test_empty_list_value_raises(self) -> None:
"""空 list 作为 alias 值应抛 ValueError."""
with pytest.raises(ValueError, match="任务列表为空"):
_ = px.CliRunner(aliases={"x": []})
def test_invalid_value_type_raises(self) -> None:
"""无效类型(int)作为 alias 值应抛 TypeError."""
with pytest.raises(TypeError, match="值类型无效"):
_ = px.CliRunner(aliases={"x": 123}) # type: ignore[dict-item]
def test_invalid_list_element_type_raises(self) -> None:
"""list 中非 str/TaskSpec 元素应抛 TypeError."""
with pytest.raises(TypeError, match="列表元素类型无效"):
_ = px.CliRunner(aliases={"x": [123]}) # type: ignore[list-item]
def test_duplicate_task_name_raises(self) -> None:
"""tasks 中重名任务应抛 ValueError."""
spec = px.cmd([*ECHO_CMD, "a"], name="dup")
with pytest.raises(ValueError, match="任务名重复"):
_ = px.CliRunner(tasks=[spec, spec], aliases={"a": "dup"})
def test_commands_excludes_unreferenced_tasks(self) -> None:
"""commands 只含 aliases,不含 tasks 中未引用的任务."""
runner = px.CliRunner(
tasks=[
px.cmd([*ECHO_CMD, "a"], name="used"),
px.cmd([*ECHO_CMD, "b"], name="unused"),
],
aliases={"a": "used"},
)
assert runner.commands == ["a"]
def test_unknown_command_rejected(self) -> None:
"""未注册的 alias 名应被拒绝(不接受裸 task 名)."""
runner = px.CliRunner(
tasks=[px.cmd([*ECHO_CMD, "a"], name="task_a")],
aliases={"a": "task_a"},
)
# task_a 是任务名,不是 alias,应被拒绝
assert runner.run(["task_a"]) == CliExitCode.FAILURE.value
+36
View File
@@ -14,6 +14,7 @@ from pyflowx.task import (
TaskSpec, TaskSpec,
TaskStatus, TaskStatus,
_env_and_cwd, _env_and_cwd,
cmd,
task_template, task_template,
) )
@@ -78,6 +79,41 @@ def test_retry_policy_negative_jitter_rejected() -> None:
RetryPolicy(jitter=-1) RetryPolicy(jitter=-1)
# ---------------------------------------------------------------------- #
# cmd() 工厂
# ---------------------------------------------------------------------- #
def test_cmd_factory_default_name_from_two_elements() -> None:
"""cmd() 默认 name = '_'.join(command[:2])."""
spec = cmd(["uv", "build"])
assert spec.name == "uv_build"
assert spec.cmd == ["uv", "build"]
def test_cmd_factory_default_name_single_element() -> None:
"""cmd() 单元素命令 name = command[0]."""
spec = cmd(["ls"])
assert spec.name == "ls"
def test_cmd_factory_explicit_name() -> None:
"""cmd() 显式 name 覆盖默认推导."""
spec = cmd(["ruff", "check", "--fix"], name="lint")
assert spec.name == "lint"
def test_cmd_factory_passes_depends_on() -> None:
"""cmd() depends_on 透传给 TaskSpec."""
spec = cmd(["echo", "b"], name="b", depends_on=("a",))
assert spec.depends_on == ("a",)
def test_cmd_factory_passes_extra_kwargs() -> None:
"""cmd() 其余 kwargs 透传给 TaskSpec."""
spec = cmd(["echo", "x"], name="x", timeout=10.0, tags=("t1",))
assert spec.timeout == 10.0
assert spec.tags == ("t1",)
def test_retry_policy_retries_property() -> None: def test_retry_policy_retries_property() -> None:
policy = RetryPolicy(max_attempts=3) policy = RetryPolicy(max_attempts=3)
assert policy.retries == 2 assert policy.retries == 2