~clirunner

This commit is contained in:
2026-06-20 17:13:18 +08:00
parent e00868e3b1
commit 6d4b5e4a1f
10 changed files with 194 additions and 143 deletions
+2 -1
View File
@@ -403,7 +403,8 @@ def main():
pymake ca # 清理所有构建产物
"""
runner = px.CliRunner(
strategy=px.Strategy.SEQUENTIAL,
description="PyMake - Python 构建工具 (替代 Makefile)",
**_build_graphs(),
graphs=**_build_graphs(),
)
runner.run_cli()
+1 -3
View File
@@ -84,9 +84,7 @@ def build_call_args(
)
# 与本任务相关的上下文子集。
dep_context: Dict[str, Any] = {
name: context[name] for name in spec.depends_on if name in context
}
dep_context: Dict[str, Any] = {name: context[name] for name in spec.depends_on if name in context}
# 检测静态 kwargs 与依赖名的冲突。
collisions = set(spec.kwargs) & set(dep_context)
+1 -3
View File
@@ -58,9 +58,7 @@ class TaskFailedError(PyFlowXError):
layer: Optional[int] = None,
) -> None:
location = f" (layer {layer})" if layer is not None else ""
super().__init__(
f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}"
)
super().__init__(f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}")
self.task = task
self.cause = cause
self.attempts = attempts
+9 -27
View File
@@ -60,9 +60,7 @@ def _emit(
)
def _log_retry(
spec: TaskSpec[object], attempts: int, max_attempts: int, exc: BaseException
) -> None:
def _log_retry(spec: TaskSpec[object], attempts: int, max_attempts: int, exc: BaseException) -> None:
"""记录重试日志(sync 与 async 共享,便于测试覆盖)。"""
logger.warning(
"task %r failed (attempt %d/%d): %r; retrying",
@@ -154,9 +152,7 @@ async def _run_async_with_retry(
return spec.effective_fn(*args, **kwargs)
if spec.timeout is not None:
result.value = await asyncio.wait_for(
loop.run_in_executor(None, fn_call), timeout=spec.timeout
)
result.value = await asyncio.wait_for(loop.run_in_executor(None, fn_call), timeout=spec.timeout)
else:
result.value = await loop.run_in_executor(None, fn_call)
result.status = TaskStatus.SUCCESS
@@ -188,9 +184,7 @@ def _build_context(
global_context: Mapping[str, Any],
) -> Mapping[str, Any]:
"""将全局上下文限制为本任务的依赖。"""
return {
dep: global_context[dep] for dep in spec.depends_on if dep in global_context
}
return {dep: global_context[dep] for dep in spec.depends_on if dep in global_context}
def _execute_layer_sequential(
@@ -237,9 +231,7 @@ def _execute_layer_threaded(
if backend.has(name):
cached = backend.get(name)
context[name] = cached
result = TaskResult(
spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached
)
result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached)
report.results[name] = result
_emit(on_event, result)
else:
@@ -281,9 +273,7 @@ async def _execute_layer_async(
if backend.has(name):
cached = backend.get(name)
context[name] = cached
result = TaskResult(
spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached
)
result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached)
report.results[name] = result
_emit(on_event, result)
else:
@@ -346,9 +336,7 @@ def run(
不会被执行。
"""
if strategy not in ("sequential", "thread", "async"):
raise ValueError(
f"unknown strategy {strategy!r}; expected 'sequential', 'thread', or 'async'."
)
raise ValueError(f"unknown strategy {strategy!r}; expected 'sequential', 'thread', or 'async'.")
graph.validate()
layers = graph.layers()
@@ -365,9 +353,7 @@ def run(
if strategy == "sequential":
_drive_sequential(graph, layers, context, report, backend, on_event)
elif strategy == "thread":
_drive_threaded(
graph, layers, context, report, backend, on_event, max_workers
)
_drive_threaded(graph, layers, context, report, backend, on_event, max_workers)
else:
_drive_async(graph, layers, context, report, backend, on_event)
except TaskFailedError:
@@ -409,9 +395,7 @@ def _drive_threaded(
) -> None:
for idx, layer in enumerate(layers, 1):
workers = max_workers or max(1, min(32, len(layer)))
_execute_layer_threaded(
layer, graph, context, report, backend, idx, on_event, workers
)
_execute_layer_threaded(layer, graph, context, report, backend, idx, on_event, workers)
def _drive_async(
@@ -434,6 +418,4 @@ async def _async_drive(
on_event: Optional[EventCallback],
) -> None:
for idx, layer in enumerate(layers, 1):
await _execute_layer_async(
layer, graph, context, report, backend, idx, on_event
)
await _execute_layer_async(layer, graph, context, report, backend, idx, on_event)
+6 -7
View File
@@ -10,12 +10,14 @@ from __future__ import annotations
import sys
from typing import Dict, Iterable, List, Mapping, Sequence, Set, Tuple
from typing_extensions import override
from .errors import CycleError, DuplicateTaskError, MissingDependencyError
from .task import TaskSpec
# graphlib 自 3.9 起进入标准库;3.8 回退到 backport。
if sys.version_info >= (3, 9): # pragma: no cover
import graphlib
import graphlib # pyright: ignore[reportUnreachable]
_TopologicalSorter = graphlib.TopologicalSorter
else: # pragma: no cover
@@ -157,9 +159,7 @@ class Graph:
for spec in self._specs.values():
if wanted & set(spec.tags):
pruned_deps = tuple(
d
for d in spec.depends_on
if d in self._specs and (wanted & set(self._specs[d].tags))
d for d in spec.depends_on if d in self._specs and (wanted & set(self._specs[d].tags))
)
kept.append(
TaskSpec(
@@ -217,9 +217,7 @@ class Graph:
valid = {"TD", "TB", "BT", "LR", "RL"}
orientation = orientation.upper()
if orientation not in valid:
raise ValueError(
f"Invalid orientation {orientation!r}; expected one of {sorted(valid)}."
)
raise ValueError(f"Invalid orientation {orientation!r}; expected one of {sorted(valid)}.")
lines: List[str] = [f"graph {orientation}"]
for name in self._specs:
lines.append(f' {name}["{name}"]')
@@ -238,6 +236,7 @@ class Graph:
out.append(f" Layer {layer_idx}: {layer}")
return "\n".join(out)
@override
def __repr__(self) -> str:
return f"Graph(tasks={len(self._specs)})"
+9 -18
View File
@@ -16,9 +16,9 @@ import enum
import sys
from typing import Dict, List, Optional, Sequence
from ..errors import PyFlowXError
from ..executors import Strategy, run
from ..graph import Graph
from .errors import PyFlowXError
from .executors import Strategy, run
from .graph import Graph
__all__ = ["CliRunner", "CliExitCode"]
@@ -72,19 +72,10 @@ class CliRunner:
runner.run(["test", "--strategy", "sequential"])
"""
def __init__(
self,
*,
strategy: Strategy = "sequential",
description: str = "",
**graphs: Graph,
) -> None:
def __init__(self, *, strategy: Strategy = "sequential", description: str = "", graphs: Dict[str, Graph]) -> None:
if not graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 校验所有值都是 Graph
for name, graph in graphs.items():
if not isinstance(graph, Graph):
raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}")
self._graphs: Dict[str, Graph] = dict(graphs)
self._strategy: Strategy = strategy
self._description: str = description
@@ -139,23 +130,23 @@ class CliRunner:
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=self._format_commands_help(),
)
parser.add_argument(
_ = parser.add_argument(
"command",
nargs="?",
help="要执行的命令",
)
parser.add_argument(
_ = parser.add_argument(
"--strategy",
choices=["sequential", "thread", "async"],
default=self._strategy,
help="执行策略 (默认: %(default)s)",
)
parser.add_argument(
_ = parser.add_argument(
"--dry-run",
action="store_true",
help="只打印执行计划, 不实际运行",
)
parser.add_argument(
_ = parser.add_argument(
"--list",
action="store_true",
help="列出所有可用命令",
+1 -3
View File
@@ -227,9 +227,7 @@ class TaskSpec(Generic[T]):
if callable(cmd):
return cmd # type: ignore[return-value]
raise TypeError(
f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}"
)
raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}")
def should_execute(self) -> bool:
"""检查任务是否应该执行.