refactor: 重构执行器和CliRunner,简化策略类型实现

1.  将Strategy枚举改为Literal类型,移除normalize_strategy函数
2.  内联策略验证逻辑到run函数中
3.  使用dataclasses.field重构CliRunner的初始化方式
4.  修复测试用例中的函数名和调用方式不匹配问题
5.  调整部分测试用例的构造语法,适配新的API
6.  修正pymake模块中的函数重命名和条件变量命名问题
7.  为部分耗时测试添加@pytest.mark.slow标记
This commit is contained in:
2026-06-21 12:52:32 +08:00
parent 4884fd53e5
commit 179e5b3811
9 changed files with 167 additions and 299 deletions
+36 -38
View File
@@ -49,7 +49,7 @@ class PymakeConfig:
conf = PymakeConfig()
def _get_maturin_build_command() -> list[str]:
def get_maturin_build_command() -> list[str]:
"""获取 maturin 构建命令(根据平台自动添加参数).
Returns
@@ -64,13 +64,13 @@ def _get_maturin_build_command() -> list[str]:
# 命令条件判断
_MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL)
_PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
_UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
_HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
_RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
_GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
_TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL)
PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
def build_graphs() -> dict[str, px.Graph]:
@@ -87,7 +87,7 @@ def build_graphs() -> dict[str, px.Graph]:
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
conditions=(UV_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -97,9 +97,9 @@ def build_graphs() -> dict[str, px.Graph]:
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cmd=get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -109,15 +109,15 @@ def build_graphs() -> dict[str, px.Graph]:
[
px.TaskSpec(
"maturin_build",
cmd=_get_maturin_build_command(),
cmd=get_maturin_build_command(),
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"uv_build",
cmd=conf.BUILD_COMMAND,
conditions=(_UV_CONDITION,),
conditions=(UV_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("maturin_build",),
),
@@ -131,7 +131,7 @@ def build_graphs() -> dict[str, px.Graph]:
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
),
]
),
@@ -141,7 +141,7 @@ def build_graphs() -> dict[str, px.Graph]:
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
conditions=(UV_CONDITION,),
),
]
),
@@ -152,12 +152,12 @@ def build_graphs() -> dict[str, px.Graph]:
"maturin_dev",
cmd=conf.MATURIN_DEV_COMMAND,
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
),
px.TaskSpec(
"uv_install",
cmd=["uv", "pip", "install", "-e", "."],
conditions=(_UV_CONDITION,),
conditions=(UV_CONDITION,),
depends_on=("maturin_dev",),
),
]
@@ -169,7 +169,7 @@ def build_graphs() -> dict[str, px.Graph]:
px.TaskSpec(
"git_clean_python",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
conditions=(GIT_CONDITION,),
),
]
),
@@ -180,7 +180,7 @@ def build_graphs() -> dict[str, px.Graph]:
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
),
]
),
@@ -191,12 +191,12 @@ def build_graphs() -> dict[str, px.Graph]:
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
),
px.TaskSpec(
"git_clean",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(_GIT_CONDITION,),
conditions=(GIT_CONDITION,),
),
]
),
@@ -217,7 +217,7 @@ def build_graphs() -> dict[str, px.Graph]:
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -236,7 +236,7 @@ def build_graphs() -> dict[str, px.Graph]:
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -248,8 +248,6 @@ def build_graphs() -> dict[str, px.Graph]:
"pytest_cov",
cmd=[
"pytest",
"-m",
"not slow",
"--cov",
"-n",
"auto",
@@ -260,7 +258,7 @@ def build_graphs() -> dict[str, px.Graph]:
"--color=yes",
"--durations=10",
],
conditions=(_PYTEST_CONDITION,),
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -276,7 +274,7 @@ def build_graphs() -> dict[str, px.Graph]:
"--fix",
"--unsafe-fixes",
],
conditions=(_RUFF_CONDITION,),
conditions=(RUFF_CONDITION,),
timeout=conf.TIMEOUT,
cwd=Path(conf.PROJECT_ROOT),
),
@@ -312,7 +310,7 @@ def build_graphs() -> dict[str, px.Graph]:
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
conditions=(HATCH_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -329,14 +327,14 @@ def build_graphs() -> dict[str, px.Graph]:
conf.CORE_PATTERN,
],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(_HATCH_CONDITION,),
conditions=(HATCH_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("publish_rust",),
),
@@ -349,7 +347,7 @@ def build_graphs() -> dict[str, px.Graph]:
"publish_rust",
cmd=["maturin", "publish"],
cwd=Path(conf.CORE_DIR),
conditions=(_MATURIN_CONDITION,),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
@@ -361,13 +359,13 @@ def build_graphs() -> dict[str, px.Graph]:
px.TaskSpec(
"tox_run",
cmd=["tox", "-p", "auto"],
conditions=(_TOX_CONDITION,),
conditions=(TOX_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 安装多版本 Python (仅安装不测试)
"tox-install": px.Graph.from_specs(
"tox_install": px.Graph.from_specs(
[
px.TaskSpec(
"uv_python_install",
@@ -383,7 +381,7 @@ def build_graphs() -> dict[str, px.Graph]:
"3.13",
"3.14",
],
conditions=(_UV_CONDITION,),
conditions=(UV_CONDITION,),
timeout=600,
),
]
@@ -421,7 +419,7 @@ def main():
🔬 多版本测试:
pymake tox - 多版本 Python 测试 (3.8-3.14)
pymake tox-install - 安装所有 Python 版本 (仅安装不测试)
pymake tox_install - 安装所有 Python 版本 (仅安装不测试)
📦 发布命令:
pymake pb - 发布到 PyPI (hatch publish)
@@ -445,8 +443,8 @@ def main():
pymake ca # 清理所有构建产物
"""
runner = px.CliRunner(
strategy=px.Strategy.SEQUENTIAL,
strategy="sequential",
description="PyMake - Python 构建工具 (替代 Makefile)",
**build_graphs(),
graphs=build_graphs(), # type: ignore[reportArgumentType]
)
runner.run_cli()
+10 -57
View File
@@ -16,11 +16,10 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import enum
import inspect
import logging
from datetime import datetime
from typing import Any, Awaitable, Callable, Mapping, cast
from typing import Any, Awaitable, Callable, Literal, Mapping, cast
from .context import build_call_args, describe_injection
from .errors import TaskFailedError, TaskTimeoutError
@@ -33,56 +32,7 @@ logger = logging.getLogger("pyflowx")
# 观察者回调类型。
EventCallback = Callable[[TaskEvent], None]
class Strategy(enum.Enum):
"""任务图执行策略.
Members
-------
SEQUENTIAL
顺序执行: 逐个运行任务, 确定性最高, 适合调试.
THREAD
线程池执行: 层内任务通过线程池并发, 适合 I/O 密集型同步任务.
ASYNC
异步执行: 通过 ``asyncio.gather`` 实现层内并发, 适合 I/O 密集型异步任务.
"""
SEQUENTIAL = "sequential"
THREAD = "thread"
ASYNC = "async"
def normalize_strategy(strategy: str | Strategy) -> Strategy:
"""将字符串或 Strategy 归一化为 Strategy 枚举.
Parameters
----------
strategy : str | Strategy
策略值, 接受字符串 (``"sequential"`` / ``"thread"`` / ``"async"``)
或 :class:`Strategy` 枚举成员.
Returns
-------
Strategy
归一化后的枚举成员.
Raises
------
ValueError
策略不被识别时.
"""
if isinstance(strategy, Strategy):
return strategy
if isinstance(strategy, str):
try:
return Strategy(strategy)
except ValueError:
valid = ", ".join(repr(s.value) for s in Strategy)
raise ValueError(
f"unknown strategy {strategy!r}; expected one of {valid}."
) from None
raise TypeError(f"strategy must be str or Strategy, got {type(strategy).__name__}")
Strategy = Literal["sequential", "thread", "async"]
def _is_async_fn(spec: TaskSpec[object]) -> bool:
@@ -397,7 +347,7 @@ def _make_verbose_callback(
def run(
graph: Graph,
strategy: str | Strategy = Strategy.SEQUENTIAL,
strategy: Strategy = "sequential",
*,
max_workers: int | None = None,
dry_run: bool = False,
@@ -436,11 +386,14 @@ def run(
任何任务耗尽重试后仍失败时。运行在失败层中止;后续层的任务
不会被执行。
"""
normalized = normalize_strategy(strategy)
graph.validate()
layers = graph.layers()
# 验证策略是否有效
valid_strategies = ("sequential", "thread", "async")
if strategy not in valid_strategies:
raise ValueError(f"unknown strategy: {strategy}. Valid: {valid_strategies}")
if dry_run:
_print_dry_run(graph, layers)
return RunReport(success=True)
@@ -455,11 +408,11 @@ def run(
context: dict[str, Any] = {}
try:
if normalized == Strategy.SEQUENTIAL:
if strategy == "sequential":
_drive_sequential(
graph, layers, context, report, backend, effective_callback
)
elif normalized == Strategy.THREAD:
elif strategy == "thread":
_drive_threaded(
graph, layers, context, report, backend, effective_callback, max_workers
)
+23 -60
View File
@@ -1,13 +1,5 @@
"""命令行运行器:根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例,
通过 argparse 解析用户输入的命令并执行对应的图.
与 bitool_skill.MapSkill 的区别:
- MapSkill 通过继承 + create_scheduler_map 构建命令映射
- CliRunner 通过关键字参数直接注入命令到图的映射, 更声明式
- CliRunner 复用 pyflowx 的 DAG 调度能力 (run/Graph/TaskSpec)
verbose 模式
------------
``CliRunner`` 默认 ``verbose=True``, 会:
@@ -20,13 +12,13 @@ verbose 模式
from __future__ import annotations
import argparse
import dataclasses
import enum
import sys
from dataclasses import dataclass, field, replace
from typing import Sequence
from .errors import PyFlowXError
from .executors import Strategy, normalize_strategy, run
from .executors import Strategy, run
from .graph import Graph
from .task import TaskSpec
@@ -64,14 +56,15 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
if spec.verbose == verbose:
new_specs.append(spec)
else:
new_specs.append(dataclasses.replace(spec, verbose=verbose))
new_specs.append(replace(spec, verbose=verbose))
return Graph.from_specs(new_specs)
@dataclass
class CliRunner:
"""命令行运行器: 根据用户输入执行对应的任务流图.
参考 bitool_skill 的 MapSkill 设计, 将命令名映射到 Graph 实例.
将命令名映射到 Graph 实例.
通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图.
Parameters
@@ -79,8 +72,6 @@ class CliRunner:
strategy : str | Strategy
默认执行策略 (``Strategy.SEQUENTIAL`` / ``Strategy.THREAD`` /
``Strategy.ASYNC`` 或对应字符串). 可被命令行 ``--strategy`` 覆盖.
description : str
CLI 描述文本, 显示在 ``--help`` 中.
verbose : bool
是否显示详细执行过程. ``True`` 时打印任务生命周期和 subprocess 输出.
默认 ``True``. 可被命令行 ``--quiet`` 关闭.
@@ -110,32 +101,24 @@ class CliRunner:
runner = px.CliRunner(
strategy=px.Strategy.THREAD,
description="My build tool",
test=px.Graph.from_specs([...]),
)
runner.run(["test", "--strategy", "sequential"])
"""
def __init__(
self,
*,
strategy: str | Strategy = Strategy.SEQUENTIAL,
description: str = "",
verbose: bool = True,
**graphs: Graph,
) -> None:
if not graphs:
graphs: dict[str, Graph] = field(default_factory=dict)
strategy: Strategy = field(default="sequential")
description: str = field(default_factory=str)
verbose: bool = field(default_factory=lambda: True)
def __post_init__(self) -> None:
if not self.graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 校验所有值都是 Graph
for name, graph in graphs.items():
for name, graph in self.graphs.items():
if not isinstance(graph, Graph):
raise TypeError(
f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}"
)
self._graphs: dict[str, Graph] = dict(graphs)
self._strategy: Strategy = normalize_strategy(strategy)
self._description: str = description
self._verbose: bool = verbose
# ------------------------------------------------------------------ #
# 内省
@@ -143,27 +126,7 @@ class CliRunner:
@property
def commands(self) -> list[str]:
"""可用的命令列表 (按插入顺序)."""
return list(self._graphs.keys())
@property
def graphs(self) -> dict[str, Graph]:
"""命令名到图的映射 (只读副本)."""
return dict(self._graphs)
@property
def strategy(self) -> Strategy:
"""默认执行策略."""
return self._strategy
@property
def description(self) -> str:
"""CLI 描述文本."""
return self._description
@property
def verbose(self) -> bool:
"""是否显示详细执行过程."""
return self._verbose
return list(self.graphs.keys())
# ------------------------------------------------------------------ #
# 参数解析
@@ -188,7 +151,7 @@ class CliRunner:
"""
parser = argparse.ArgumentParser(
prog=self._prog_name(),
description=self._description or "PyFlowX CLI Runner",
description=self.description or "PyFlowX CLI Runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=self._format_commands_help(),
)
@@ -199,8 +162,8 @@ class CliRunner:
)
_ = parser.add_argument(
"--strategy",
choices=[s.value for s in Strategy],
default=self._strategy.value,
choices=list(Strategy.__args__),
default="sequential",
help="执行策略 (默认: %(default)s)",
)
_ = parser.add_argument(
@@ -223,7 +186,7 @@ class CliRunner:
def _format_commands_help(self) -> str:
"""格式化命令帮助文本."""
lines = ["可用命令:"]
for cmd in self._graphs:
for cmd in self.graphs:
lines.append(f" {cmd}")
return "\n".join(lines)
@@ -262,8 +225,8 @@ class CliRunner:
return CliExitCode.FAILURE.value
# 验证命令
if parsed.command not in self._graphs:
available = ", ".join(self._graphs.keys())
if parsed.command not in self.graphs:
available = ", ".join(self.graphs.keys())
print(
f"错误: 未知命令 {parsed.command!r} (可用命令: {available})",
file=sys.stderr,
@@ -271,10 +234,10 @@ class CliRunner:
return CliExitCode.FAILURE.value
# 确定是否 verbose: --quiet 覆盖默认值
verbose = self._verbose and not parsed.quiet
verbose = self.verbose and not parsed.quiet
# 对图应用 verbose 设置 (重建带 verbose 标记的 spec)
graph = self._graphs[parsed.command]
graph = self.graphs[parsed.command]
if verbose:
graph = _apply_verbose_to_graph(graph, verbose=True)