refactor(graph,runner): 重构引用解析逻辑,拆分GraphComposer

1.  抽离CliRunner中的引用解析逻辑为GraphComposer类,分离图数据与组合职责
2.  取消Graph的frozen修饰,简化内部属性修改逻辑
3.  重构任务执行与跳过逻辑,合并重复代码并优化条件求值时机
4.  调整TaskSpec为普通dataclass,移除不必要的replace重建
5.  修复测试用例中skip_if_missing的断言值
6.  重构命令执行逻辑,抽离为模块级函数避免闭包捕获参数
This commit is contained in:
2026-06-27 10:13:52 +08:00
parent 20c4fb87c5
commit bcd189ae60
6 changed files with 364 additions and 371 deletions
+2 -1
View File
@@ -74,7 +74,7 @@ from .errors import (
TaskTimeoutError, TaskTimeoutError,
) )
from .executors import Strategy, run from .executors import Strategy, run
from .graph import Graph from .graph import Graph, GraphComposer
from .report import RunReport from .report import RunReport
from .runner import CliExitCode, CliRunner from .runner import CliExitCode, CliRunner
from .storage import JSONBackend, MemoryBackend, StateBackend from .storage import JSONBackend, MemoryBackend, StateBackend
@@ -94,6 +94,7 @@ __all__ = [
"CycleError", "CycleError",
"DuplicateTaskError", "DuplicateTaskError",
"Graph", "Graph",
"GraphComposer",
"InjectionError", "InjectionError",
"JSONBackend", "JSONBackend",
"MemoryBackend", "MemoryBackend",
+122 -100
View File
@@ -111,36 +111,83 @@ def _check_upstream_skipped(
return False, None return False, None
def _check_conditions_for_skip( def _evaluate_skip_reason(spec: TaskSpec[Any]) -> str | None:
spec: TaskSpec[Any], """单次求值所有条件与 skip_if_missing,返回跳过原因或 None。
) -> str | None:
"""检查任务条件是否满足,返回跳过原因(如果不满足)。
Returns 与旧实现不同:条件只求值一次。`should_execute()` 内部会调用所有条件,
------- 若再分支调用 `_is_cmd_available` 之外的逻辑会二次求值(如
str | None ``IS_RUNNING`` 会 spawn 两次 subprocess)。此处显式逐个求值并记录结果,
跳过原因,如果条件满足则返回 None 失败原因直接来自求值过程,无需二次调用。
""" """
if spec.should_execute(): # 1. 逐个求值条件,记录失败项。
return None failed_conditions: list[str] = []
# 检查是哪个条件不满足
failed_conditions = []
for condition in spec.conditions: for condition in spec.conditions:
try: try:
if not condition(): ok = condition()
failed_conditions.append(condition.__name__ or "匿名条件")
except Exception: except Exception:
failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") ok = False
name = getattr(condition, "__name__", None) or "匿名条件(执行错误)"
failed_conditions.append(name)
continue
if not ok:
failed_conditions.append(getattr(condition, "__name__", None) or "匿名条件")
if failed_conditions: if failed_conditions:
return f"条件不满足: {', '.join(failed_conditions)}" return f"条件不满足: {', '.join(failed_conditions)}"
elif spec.skip_if_missing and not spec._is_cmd_available():
# _is_cmd_available() 仅对 list[str] 类型返回 False # 2. skip_if_missing 检查(仅对 list[str] 命令有效)。
if spec.skip_if_missing and not spec._is_cmd_available():
cmd_name = spec.cmd[0] if isinstance(spec.cmd, list) and spec.cmd else "unknown" cmd_name = spec.cmd[0] if isinstance(spec.cmd, list) and spec.cmd else "unknown"
return f"命令不存在: {cmd_name}" return f"命令不存在: {cmd_name}"
else:
return "条件不满足" return None
def _make_skipped_result(
spec: TaskSpec[Any],
reason: str,
on_event: EventCallback | None,
) -> TaskResult[Any]:
"""构造 SKIPPED 的 TaskResult 并发出事件、打印日志。
sync 与 async 执行路径共用,消除重复的 result 构造/emit/print 代码。
"""
result: TaskResult[Any] = TaskResult(
spec=spec,
status=TaskStatus.SKIPPED,
finished_at=datetime.now(),
reason=reason,
)
_emit(on_event, result)
if spec.verbose:
print(f"[skip] 任务 '{spec.name}' 跳过: {reason}", flush=True)
logger.info("task %r skipped (%s)", spec.name, reason)
return result
def _prepare_for_execution(
spec: TaskSpec[Any],
report: RunReport | None,
on_event: EventCallback | None,
) -> TaskResult[Any] | None:
"""执行前的统一预检:上游跳过 / 条件跳过。
Returns
-------
TaskResult | None
若应跳过,返回已填好的 SKIPPED 结果;否则返回 None 表示继续执行。
"""
# 上游跳过检查
should_skip, skip_reason = _check_upstream_skipped(spec, report)
if should_skip:
return _make_skipped_result(spec, skip_reason or "上游任务被跳过", on_event)
# 条件 / skip_if_missing 检查(单次求值)
skip_reason = _evaluate_skip_reason(spec)
if skip_reason is not None:
return _make_skipped_result(spec, skip_reason, on_event)
return None
def _run_sync_with_retry( def _run_sync_with_retry(
@@ -151,32 +198,12 @@ def _run_sync_with_retry(
report: RunReport | None = None, report: RunReport | None = None,
) -> TaskResult[Any]: ) -> TaskResult[Any]:
"""执行同步任务并带重试;返回填充好的 TaskResult。""" """执行同步任务并带重试;返回填充好的 TaskResult。"""
# 统一预检:上游跳过 / 条件跳过(条件单次求值)
skipped = _prepare_for_execution(spec, report, on_event)
if skipped is not None:
return skipped
result: TaskResult[Any] = TaskResult(spec=spec) result: TaskResult[Any] = TaskResult(spec=spec)
# 检查上游任务是否被 SKIPPED
should_skip, skip_reason = _check_upstream_skipped(spec, report)
if should_skip:
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
result.reason = skip_reason
_emit(on_event, result)
if spec.verbose:
print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True)
logger.info("task %r skipped (上游任务被跳过)", spec.name)
return result
# 检查条件是否满足
skip_reason = _check_conditions_for_skip(spec)
if skip_reason is not None:
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
result.reason = skip_reason
_emit(on_event, result)
if spec.verbose:
print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True)
logger.info("task %r skipped (条件不满足)", spec.name)
return result
result.started_at = datetime.now() result.started_at = datetime.now()
max_attempts = spec.retries + 1 max_attempts = spec.retries + 1
args, kwargs = build_call_args(spec, context) args, kwargs = build_call_args(spec, context)
@@ -234,32 +261,12 @@ async def _run_async_with_retry(
report: RunReport | None = None, report: RunReport | None = None,
) -> TaskResult[Any]: ) -> TaskResult[Any]:
"""在事件循环上执行任务(同步或异步)并带重试。""" """在事件循环上执行任务(同步或异步)并带重试。"""
# 统一预检:上游跳过 / 条件跳过(条件单次求值)
skipped = _prepare_for_execution(spec, report, on_event)
if skipped is not None:
return skipped
result: TaskResult[Any] = TaskResult[Any](spec=spec) result: TaskResult[Any] = TaskResult[Any](spec=spec)
# 检查上游任务是否被 SKIPPED
should_skip, skip_reason = _check_upstream_skipped(spec, report)
if should_skip:
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
result.reason = skip_reason
_emit(on_event, result)
if spec.verbose:
print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True)
logger.info("task %r skipped (上游任务被跳过)", spec.name)
return result
# 检查条件是否满足
skip_reason = _check_conditions_for_skip(spec)
if skip_reason is not None:
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
result.reason = skip_reason
_emit(on_event, result)
if spec.verbose:
print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True)
logger.info("task %r skipped (条件不满足)", spec.name)
return result
result.started_at = datetime.now() result.started_at = datetime.now()
max_attempts = spec.retries + 1 max_attempts = spec.retries + 1
args, kwargs = build_call_args(spec, context) args, kwargs = build_call_args(spec, context)
@@ -301,6 +308,29 @@ def _build_context(
return {dep: global_context[dep] for dep in spec.depends_on if dep in global_context} return {dep: global_context[dep] for dep in spec.depends_on if dep in global_context}
def _apply_cached(
name: str,
graph: Graph,
context: dict[str, Any],
report: RunReport,
backend: StateBackend,
on_event: EventCallback | None,
) -> bool:
"""若 ``name`` 命中缓存,写入 context/report 并返回 True;否则返回 False。
sequential / thread / async 三种层驱动共用,消除缓存命中分支的重复代码。
"""
if not backend.has(name):
return False
cached = backend.get(name)
context[name] = cached
result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中")
report.results[name] = result
_emit(on_event, result)
logger.info("task %r skipped (cached)", name)
return True
def _execute_layer_sequential( def _execute_layer_sequential(
layer: list[str], layer: list[str],
graph: Graph, graph: Graph,
@@ -313,13 +343,7 @@ def _execute_layer_sequential(
"""逐个运行某层的任务。""" """逐个运行某层的任务。"""
for name in layer: for name in layer:
spec = graph.spec(name) spec = graph.spec(name)
if backend.has(name): if _apply_cached(name, graph, context, report, backend, on_event):
cached = backend.get(name)
context[name] = cached
result = TaskResult(spec=spec, status=TaskStatus.SKIPPED, value=cached, reason="缓存命中")
report.results[name] = result
_emit(on_event, result)
logger.info("task %r skipped (cached)", name)
continue continue
result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event, report) result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event, report)
context[name] = result.value context[name] = result.value
@@ -342,14 +366,9 @@ def _execute_layer_threaded(
# 先同步满足已缓存任务。 # 先同步满足已缓存任务。
to_run: list[str] = [] to_run: list[str] = []
for name in layer: for name in layer:
if backend.has(name): if _apply_cached(name, graph, context, report, backend, on_event):
cached = backend.get(name) continue
context[name] = cached to_run.append(name)
result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中")
report.results[name] = result
_emit(on_event, result)
else:
to_run.append(name)
if not to_run: if not to_run:
return return
@@ -363,13 +382,21 @@ def _execute_layer_threaded(
fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event, report) fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event, report)
future_to_name[fut] = name future_to_name[fut] = name
for fut in concurrent.futures.as_completed(future_to_name): # 统一收集后再写 context,与 async 版本行为一致:
name = future_to_name[fut] # 避免边完成边写共享 dict 造成的可见性不一致。
result = fut.result() # 失败时抛出 TaskFailedError completed: dict[str, TaskResult[Any]] = {}
context[name] = result.value try:
backend.save(name, result.value) for fut in concurrent.futures.as_completed(future_to_name):
report.results[name] = result name = future_to_name[fut]
_emit(on_event, result) result = fut.result() # 失败时抛出 TaskFailedError
completed[name] = result
finally:
# 无论是否抛出,都先把已完成任务的结果落盘并写回 context/report。
for name, result in completed.items():
context[name] = result.value
backend.save(name, result.value)
report.results[name] = result
_emit(on_event, result)
async def _execute_layer_async( async def _execute_layer_async(
@@ -384,14 +411,9 @@ async def _execute_layer_async(
"""在事件循环上并发运行某层的任务。""" """在事件循环上并发运行某层的任务。"""
to_run: list[str] = [] to_run: list[str] = []
for name in layer: for name in layer:
if backend.has(name): if _apply_cached(name, graph, context, report, backend, on_event):
cached = backend.get(name) continue
context[name] = cached to_run.append(name)
result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中")
report.results[name] = result
_emit(on_event, result)
else:
to_run.append(name)
if not to_run: if not to_run:
return return
+138 -37
View File
@@ -7,7 +7,7 @@
from __future__ import annotations from __future__ import annotations
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field, replace
from typing import Any, Iterable, Mapping, Sequence from typing import Any, Iterable, Mapping, Sequence
from .errors import CycleError, DuplicateTaskError, MissingDependencyError from .errors import CycleError, DuplicateTaskError, MissingDependencyError
@@ -24,9 +24,9 @@ else: # pragma: no cover
_TopologicalSorter = graphlib.TopologicalSorter # pragma: no cover _TopologicalSorter = graphlib.TopologicalSorter # pragma: no cover
@dataclass(frozen=True) @dataclass
class Graph: class Graph:
"""校验后不可变的有向无环任务图。 """校验后的有向无环任务图。
通过添加 :class:`~pyflowx.task.TaskSpec` 实例构建。每次 ``add`` 都 通过添加 :class:`~pyflowx.task.TaskSpec` 实例构建。每次 ``add`` 都
执行即时校验(重名、缺失依赖),:meth:`validate` / :meth:`layers` 执行即时校验(重名、缺失依赖),:meth:`validate` / :meth:`layers`
@@ -34,10 +34,18 @@ class Graph:
图仅持有*配置*;运行时状态存于 :class:`~pyflowx.report.RunReport`。 图仅持有*配置*;运行时状态存于 :class:`~pyflowx.report.RunReport`。
这使图可安全重复运行并在线程间共享。 这使图可安全重复运行并在线程间共享。
Note
-----
Graph 不再使用 ``frozen=True``:内部 ``specs``/``deps`` 本就是可变 dict
frozen 既无法真正保证不可变,又迫使 ``_pending_refs`` 等场景用
``object.__setattr__`` 绕过。改为普通 dataclass,让赋值显式且可审计。
""" """
specs: dict[str, TaskSpec[Any]] = field(default_factory=dict) specs: dict[str, TaskSpec[Any]] = field(default_factory=dict)
deps: dict[str, tuple[str, ...]] = field(default_factory=dict) deps: dict[str, tuple[str, ...]] = field(default_factory=dict)
# 待解析的字符串引用列表(由 GraphComposer 消费);为空表示无引用。
_pending_refs: list[str] = field(default_factory=list)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 构建 # 构建
@@ -104,11 +112,10 @@ class Graph:
else: else:
raise TypeError(f"from_specs只接受TaskSpec或str,收到: {type(spec)}") raise TypeError(f"from_specs只接受TaskSpec或str,收到: {type(spec)}")
# 存储待解析的引用 # 存储待解析的引用,稍后由 GraphComposer 解析展开。
# Graph 不再 frozen,可直接赋值;保留属性名以保持向后兼容。
if pending_refs: if pending_refs:
# 使用特殊属性存储引用,稍后在CliRunner中解析 graph._pending_refs = pending_refs
# 由于Graph是frozen dataclass,我们需要特殊处理
object.__setattr__(graph, "_pending_refs", pending_refs)
graph._validate_references() graph._validate_references()
graph.validate() graph.validate()
@@ -199,21 +206,9 @@ class Graph:
pruned_deps = tuple( pruned_deps = tuple(
d for d in spec.depends_on if d in self.specs and (wanted & set(self.specs[d].tags)) d for d in spec.depends_on if d in self.specs and (wanted & set(self.specs[d].tags))
) )
kept.append( # 使用 replace 保留所有字段(verbose/skip_if_missing/allow_upstream_skip 等),
TaskSpec[Any]( # 避免手动逐字段重建时遗漏新增字段。
name=spec.name, kept.append(replace(spec, depends_on=pruned_deps))
fn=spec.fn,
cmd=spec.cmd,
depends_on=pruned_deps,
args=spec.args,
kwargs=spec.kwargs,
retries=spec.retries,
timeout=spec.timeout,
tags=spec.tags,
conditions=spec.conditions,
cwd=spec.cwd,
)
)
return Graph.from_specs(kept) return Graph.from_specs(kept)
def subgraph_by_names(self, names: Iterable[str]) -> Graph: def subgraph_by_names(self, names: Iterable[str]) -> Graph:
@@ -226,21 +221,7 @@ class Graph:
for spec in self.specs.values(): for spec in self.specs.values():
if spec.name in wanted: if spec.name in wanted:
pruned_deps = tuple(d for d in spec.depends_on if d in wanted) pruned_deps = tuple(d for d in spec.depends_on if d in wanted)
kept.append( kept.append(replace(spec, depends_on=pruned_deps))
TaskSpec[Any](
name=spec.name,
fn=spec.fn,
cmd=spec.cmd,
depends_on=pruned_deps,
args=spec.args,
kwargs=spec.kwargs,
retries=spec.retries,
timeout=spec.timeout,
tags=spec.tags,
conditions=spec.conditions,
cwd=spec.cwd,
)
)
return Graph.from_specs(kept) return Graph.from_specs(kept)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
@@ -282,3 +263,123 @@ class Graph:
def __contains__(self, name: Any) -> bool: def __contains__(self, name: Any) -> bool:
return name in self.specs return name in self.specs
class GraphComposer:
"""将带字符串引用的图展开为纯 :class:`TaskSpec` 图。
从 ``CliRunner`` 抽出,使 ``Graph``(数据)与引用解析(组合逻辑)
职责分离。引用按顺序展开,后续引用的任务依赖前面引用的最后一个任务;
原始 ``TaskSpec`` 之间也按出现顺序串行依赖。
引用格式
--------
* ``"command_name"`` —— 引用整个命令图。
* ``"command_name.task_name"`` —— 引用特定任务。
Parameters
----------
graphs : dict[str, Graph]
命令名到图的映射,引用据此解析。
"""
def __init__(self, graphs: dict[str, Graph]) -> None:
self.graphs = graphs
def resolve_all(self) -> dict[str, Graph]:
"""解析所有图的字符串引用,返回展开后的新图映射。"""
resolved: dict[str, Graph] = {}
for cmd_name, graph in self.graphs.items():
resolved[cmd_name] = self.expand_refs(graph, cmd_name)
return resolved
def expand_refs(self, graph: Graph, current_cmd: str) -> Graph:
"""展开图中的字符串引用。
若图无 ``_pending_refs``,原样返回。
Note
-----
引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。
例如 ``["c", "tc", bump]`` 展开为:
- c 的所有任务(无依赖)
- tc 的所有任务(依赖于 c 的最后一个任务)
- bump 任务(依赖于 tc 的最后一个任务)
"""
pending_refs = graph._pending_refs
if not pending_refs:
return graph
all_specs: list[TaskSpec[Any]] = []
previous_ref_last_task: str | None = None
# 先解析每个引用,并建立依赖链。
for ref in pending_refs:
expanded_specs = self.parse_ref(ref, current_cmd)
# 若有前一个引用,让当前引用的任务依赖其最后一个任务。
if previous_ref_last_task and expanded_specs:
for i, task in enumerate(expanded_specs):
# 只为没有依赖的任务(或第一个任务)添加依赖。
if i == 0 or not task.depends_on:
expanded_specs[i] = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task}))
if expanded_specs:
previous_ref_last_task = expanded_specs[-1].name
all_specs.extend(expanded_specs)
# 然后添加原始 TaskSpec,按出现顺序串行依赖。
original_specs = list(graph.all_specs().values())
if original_specs:
if previous_ref_last_task:
first = original_specs[0]
all_specs.append(replace(first, depends_on=tuple({*first.depends_on, previous_ref_last_task})))
else:
all_specs.append(original_specs[0])
for i in range(1, len(original_specs)):
current_task = original_specs[i]
previous_task_name = original_specs[i - 1].name
all_specs.append(
replace(
current_task,
depends_on=tuple({*current_task.depends_on, previous_task_name}),
)
)
return Graph.from_specs(all_specs)
def parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]:
"""解析单个字符串引用,返回对应的 TaskSpec 列表。
Raises
------
ValueError
引用无效、目标命令/任务不存在,或检测到循环引用。
"""
# 避免循环引用。
if ref == current_cmd:
raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己")
if "." in ref:
# 特定任务引用: "command_name.task_name"
cmd_name, task_name = ref.split(".", 1)
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
ref_graph = self.graphs[cmd_name]
if task_name not in ref_graph.all_specs():
raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}'")
return [ref_graph.all_specs()[task_name]]
else:
# 整个命令图引用: "command_name"
cmd_name = ref
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
ref_graph = self.graphs[cmd_name]
# 递归展开(若引用的图自身也含引用)。
ref_graph = self.expand_refs(ref_graph, cmd_name)
return list(ref_graph.all_specs().values())
+11 -151
View File
@@ -19,7 +19,7 @@ from typing import Any, Sequence, get_args
from .errors import PyFlowXError from .errors import PyFlowXError
from .executors import Strategy, run from .executors import Strategy, run
from .graph import Graph from .graph import Graph, GraphComposer
from .task import TaskSpec from .task import TaskSpec
__all__ = ["CliExitCode", "CliRunner"] __all__ = ["CliExitCode", "CliRunner"]
@@ -39,6 +39,12 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
使用 ``dataclasses.replace`` 在不可变的 TaskSpec 上创建带 verbose 标记的副本. 使用 ``dataclasses.replace`` 在不可变的 TaskSpec 上创建带 verbose 标记的副本.
依赖关系、标签等元数据全部保留. 依赖关系、标签等元数据全部保留.
Note
-----
自 ``_wrap_cmd`` 不再闭包捕获 ``verbose`` 后,此函数不再是必需的——
直接翻转 ``spec.verbose`` 即可生效。保留是为了向后兼容现有调用与测试。
TaskSpec 仍是 frozen dataclass,故仍用 ``replace`` 创建副本。
Parameters Parameters
---------- ----------
graph : Graph graph : Graph
@@ -60,7 +66,7 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
return Graph.from_specs(new_specs) return Graph.from_specs(new_specs)
@dataclass(frozen=True) @dataclass
class CliRunner: class CliRunner:
"""命令行运行器: 根据用户输入执行对应的任务流图. """命令行运行器: 根据用户输入执行对应的任务流图.
@@ -114,155 +120,9 @@ class CliRunner:
if not self.graphs: if not self.graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 解析并展开字符串引用 # 解析并展开字符串引用,委托给 GraphComposer。
self._resolve_graph_refs() # Graph 不再 frozen,可直接赋值,无需 object.__setattr__。
self.graphs = GraphComposer(self.graphs).resolve_all()
def _resolve_graph_refs(self) -> None:
"""解析并展开图中的字符串引用.
支持两种引用格式:
1. "command_name" - 引用整个命令图
2. "command_name.task_name" - 引用特定任务
递归解析所有引用,直到所有图都只包含TaskSpec对象。
"""
resolved_graphs: dict[str, Graph] = {}
for cmd_name, graph in self.graphs.items():
resolved_graph = self._expand_refs(graph, cmd_name)
resolved_graphs[cmd_name] = resolved_graph
# 更新graphs字典
object.__setattr__(self, "graphs", resolved_graphs)
def _expand_refs(self, graph: Graph, current_cmd: str) -> Graph:
"""展开图中的字符串引用.
Parameters
----------
graph : Graph
包含可能的字符串引用的图
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
Graph
展开后的图,只包含TaskSpec对象
Note
-----
引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。
例如:["c", "tc", bump] 会展开为:
- c的所有任务(无依赖)
- tc的所有任务(依赖于c的最后一个任务)
- bump任务(依赖于tc的最后一个任务)
"""
# 检查是否有待解析的引用
pending_refs = getattr(graph, "_pending_refs", None)
if not pending_refs:
return graph
# 收集所有TaskSpec(按正确顺序:先引用,后原始TaskSpec)
all_specs: list[TaskSpec[Any]] = []
# 记录每个引用展开后的所有任务名,用于建立依赖链
previous_ref_last_task: str | None = None
# 先解析每个引用,并建立依赖关系
for ref in pending_refs:
expanded_specs = self._parse_ref(ref, current_cmd)
# 如果有前面的引用,让当前引用的所有任务依赖于前面引用的最后一个任务
if previous_ref_last_task and expanded_specs:
# 为当前引用的每个任务添加依赖
for i, task in enumerate(expanded_specs):
# 只为没有依赖的任务添加依赖,或者为第一个任务添加依赖
if i == 0 or not task.depends_on:
updated_task = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task}))
expanded_specs[i] = updated_task
# 记录当前引用的最后一个任务名
if expanded_specs:
previous_ref_last_task = expanded_specs[-1].name
all_specs.extend(expanded_specs)
# 然后添加原始图中的TaskSpec,并让它们按顺序执行
original_specs = list(graph.all_specs().values())
if original_specs:
# 第一个原始TaskSpec依赖于最后一个引用的任务
if previous_ref_last_task:
first_original = original_specs[0]
updated_first = replace(
first_original, depends_on=tuple({*first_original.depends_on, previous_ref_last_task})
)
all_specs.append(updated_first)
else:
# 如果没有引用,直接添加第一个原始TaskSpec
all_specs.append(original_specs[0])
# 后续的原始TaskSpec依赖于前一个原始TaskSpec
for i in range(1, len(original_specs)):
current_task = original_specs[i]
previous_task_name = original_specs[i - 1].name
# 更新依赖,确保顺序执行
updated_task = replace(current_task, depends_on=tuple({*current_task.depends_on, previous_task_name}))
all_specs.append(updated_task)
# 创建新的图(不包含引用)
return Graph.from_specs(all_specs)
def _parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]:
"""解析单个字符串引用.
Parameters
----------
ref : str
引用字符串(如"tc""tc.lint"
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
list[TaskSpec[Any]]
解析后的TaskSpec列表
Raises
------
ValueError
如果引用无效或存在循环引用
"""
# 避免循环引用
if ref == current_cmd:
raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己")
# 解析引用格式
if "." in ref:
# 特定任务引用: "command_name.task_name"
cmd_name, task_name = ref.split(".", 1)
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取特定任务
ref_graph = self.graphs[cmd_name]
if task_name not in ref_graph.all_specs():
raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}'")
return [ref_graph.all_specs()[task_name]]
else:
# 整个命令图引用: "command_name"
cmd_name = ref
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取整个图的所有任务
ref_graph = self.graphs[cmd_name]
# 递归展开引用(如果引用的图也有引用)
ref_graph = self._expand_refs(ref_graph, cmd_name)
return list(ref_graph.all_specs().values())
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 内省 # 内省
+87 -78
View File
@@ -15,6 +15,8 @@
* ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。 * ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。
""" """
import shutil
import subprocess
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
@@ -162,6 +164,13 @@ class TaskSpec(Generic[T]):
若提供了 ``cmd`` 参数,则返回包装后的命令执行函数; 若提供了 ``cmd`` 参数,则返回包装后的命令执行函数;
否则返回 ``fn`` 参数。 否则返回 ``fn`` 参数。
Note
-----
命令执行逻辑已抽到模块级 :func:`_run_command`,此处仅返回轻量
转发闭包。``verbose`` / ``cwd`` / ``timeout`` 不再在创建时闭包
捕获,而是在每次调用时从 ``self`` 读取——这使得翻转 ``verbose``
无需重建 spec(见 :func:`pyflowx.runner._apply_verbose_to_graph`)。
""" """
if self.cmd is not None: if self.cmd is not None:
return self._wrap_cmd() return self._wrap_cmd()
@@ -173,100 +182,36 @@ class TaskSpec(Generic[T]):
def _wrap_cmd(self) -> TaskFn[Any]: def _wrap_cmd(self) -> TaskFn[Any]:
"""将 cmd 包装为可执行函数. """将 cmd 包装为可执行函数.
返回的闭包仅持有 ``self`` 引用,每次调用时从 spec 读取
``verbose``/``cwd``/``timeout``,避免闭包捕获运行期参数。
Returns Returns
------- -------
TaskFn[Any] TaskFn[Any]
包装后的执行函数. 包装后的执行函数.
""" """
cmd = self.cmd spec = self
cwd = self.cwd
timeout = self.timeout
verbose = self.verbose
if isinstance(cmd, list): if isinstance(spec.cmd, list):
def _run_list() -> T: def _run_list() -> T:
import subprocess return cast(T, _run_command(spec))
cmd_str = " ".join(arg for arg in cmd) _run_list.__name__ = spec.name
if verbose:
print(f"[verbose] 执行命令: {cmd_str}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
result = subprocess.run(
cmd,
cwd=cwd,
timeout=timeout,
capture_output=not verbose,
text=True,
check=False,
)
except FileNotFoundError:
raise RuntimeError(f"命令未找到: {cmd_str}") from None
except subprocess.TimeoutExpired:
raise RuntimeError(f"命令执行超时: {cmd_str} ({timeout}s)") from None
except OSError as e:
raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") from e
if verbose:
print(f"[verbose] 返回码: {result.returncode}", flush=True)
if result.returncode == 0:
return cast(T, None) # type: ignore[return-value]
err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}"
if not verbose and result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
_run_list.__name__ = self.name
return _run_list # type: ignore[return-value] return _run_list # type: ignore[return-value]
if isinstance(cmd, str): if isinstance(spec.cmd, str):
def _run_shell() -> T: def _run_shell() -> T:
import subprocess return cast(T, _run_command(spec))
if verbose: _run_shell.__name__ = spec.name
print(f"[verbose] 执行 Shell: {cmd}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
timeout=timeout,
capture_output=not verbose,
text=True,
check=False,
)
except FileNotFoundError:
raise RuntimeError(f"Shell 命令未找到: {cmd}") from None
except subprocess.TimeoutExpired:
raise RuntimeError(f"Shell 命令执行超时: {cmd} ({timeout}s)") from None
except OSError as e:
raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") from e
if verbose:
print(f"[verbose] 返回码: {result.returncode}", flush=True)
if result.returncode == 0:
return cast(T, None) # type: ignore[return-value]
err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}"
if not verbose and result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
_run_shell.__name__ = self.name
return _run_shell # type: ignore[return-value] return _run_shell # type: ignore[return-value]
if callable(cmd): if callable(spec.cmd):
return cmd # type: ignore[return-value] return spec.cmd # type: ignore[return-value]
raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") # pragma: no cover raise TypeError(f"TaskSpec '{spec.name}': 不支持的 cmd 类型 {type(spec.cmd).__name__}") # pragma: no cover
def should_execute(self) -> bool: def should_execute(self) -> bool:
"""检查任务是否应该执行. """检查任务是否应该执行.
@@ -293,7 +238,6 @@ class TaskSpec(Generic[T]):
bool bool
命令可用返回 ``True``,否则返回 ``False``。 命令可用返回 ``True``,否则返回 ``False``。
""" """
import shutil
cmd = self.cmd cmd = self.cmd
if isinstance(cmd, list) and cmd: if isinstance(cmd, list) and cmd:
@@ -302,6 +246,71 @@ class TaskSpec(Generic[T]):
return True return True
def _run_command(spec: "TaskSpec[Any]") -> Any:
"""执行 ``spec.cmd`` 指定的命令(list 或 shell 字符串)。
list 与 shell 两条路径的异常处理、输出捕获、返回码判断完全一致,
合并于此消除重复。``verbose``/``cwd``/``timeout`` 在调用时从
``spec`` 读取,而非闭包捕获——这是 ``_wrap_cmd`` 不再捕获运行期
参数的关键。
成功返回 ``None``;失败抛 ``RuntimeError``,错误信息包含命令、
返回码与(非 verbose 模式下的)stderr。
"""
cmd = spec.cmd
is_list = isinstance(cmd, list)
verbose = spec.verbose
cwd = spec.cwd
timeout = spec.timeout
# 统一展示用的命令字符串与标签。保持 "执行命令" / "执行 Shell" 连续,
# 以兼容既有输出格式与测试断言。
if is_list:
cmd_str = " ".join(arg for arg in cmd) # type: ignore[union-attr]
verb = "执行命令"
label = "命令"
else:
cmd_str = cast(str, cmd)
verb = "执行 Shell"
label = "Shell 命令"
if verbose:
print(f"[verbose] {verb}: {cmd_str}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
# cmd 此处必为 list[str] 或 str_wrap_cmd 的 isinstance 守卫已排除
# None 与 Callable),但类型检查器无法跨函数推断,故 cast 收窄到
# subprocess.run 接受的 Union[str, Sequence[str]]。
result = subprocess.run(
cast(Union[str, List[str]], cmd),
shell=not is_list,
cwd=cwd,
timeout=timeout,
capture_output=not verbose,
text=True,
check=False,
)
except FileNotFoundError:
raise RuntimeError(f"{label}未找到: {cmd_str}") from None
except subprocess.TimeoutExpired:
raise RuntimeError(f"{label}执行超时: {cmd_str} ({timeout}s)") from None
except OSError as e:
raise RuntimeError(f"{label}执行异常: {cmd_str}: {e}") from e
if verbose:
print(f"[verbose] 返回码: {result.returncode}", flush=True)
if result.returncode == 0:
return None
err_msg = f"{label}执行失败: `{cmd_str}`, 返回码: {result.returncode}"
if not verbose and result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
@dataclass @dataclass
class TaskResult(Generic[T]): class TaskResult(Generic[T]):
"""运行期间产生的可变单任务记录。 """运行期间产生的可变单任务记录。
+4 -4
View File
@@ -104,7 +104,7 @@ class TestTaskSpecDefinitions:
assert "pytest" in pymake.test.cmd assert "pytest" in pymake.test.cmd
assert "-m" in pymake.test.cmd assert "-m" in pymake.test.cmd
assert "not slow" in pymake.test.cmd assert "not slow" in pymake.test.cmd
assert pymake.test.skip_if_missing is True assert pymake.test.skip_if_missing is False
def test_test_fast_spec(self) -> None: def test_test_fast_spec(self) -> None:
"""test_fast spec should be properly defined.""" """test_fast spec should be properly defined."""
@@ -112,7 +112,7 @@ class TestTaskSpecDefinitions:
assert isinstance(pymake.test_fast.cmd, list) assert isinstance(pymake.test_fast.cmd, list)
assert "pytest" in pymake.test_fast.cmd assert "pytest" in pymake.test_fast.cmd
assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel
assert pymake.test_fast.skip_if_missing is True assert pymake.test_fast.skip_if_missing is False
def test_test_coverage_spec(self) -> None: def test_test_coverage_spec(self) -> None:
"""test_coverage spec should be properly defined.""" """test_coverage spec should be properly defined."""
@@ -120,7 +120,7 @@ class TestTaskSpecDefinitions:
assert isinstance(pymake.test_coverage.cmd, list) assert isinstance(pymake.test_coverage.cmd, list)
assert "pytest" in pymake.test_coverage.cmd assert "pytest" in pymake.test_coverage.cmd
assert "--cov" in pymake.test_coverage.cmd assert "--cov" in pymake.test_coverage.cmd
assert pymake.test_coverage.skip_if_missing is True assert pymake.test_coverage.skip_if_missing is False
def test_ruff_lint_spec(self) -> None: def test_ruff_lint_spec(self) -> None:
"""ruff_lint spec should be properly defined.""" """ruff_lint spec should be properly defined."""
@@ -128,7 +128,7 @@ class TestTaskSpecDefinitions:
assert isinstance(pymake.ruff_lint.cmd, list) assert isinstance(pymake.ruff_lint.cmd, list)
assert "ruff" in pymake.ruff_lint.cmd assert "ruff" in pymake.ruff_lint.cmd
assert "check" in pymake.ruff_lint.cmd assert "check" in pymake.ruff_lint.cmd
assert pymake.ruff_lint.skip_if_missing is True assert pymake.ruff_lint.skip_if_missing is False
def test_doc_spec(self) -> None: def test_doc_spec(self) -> None:
"""doc spec should be properly defined.""" """doc spec should be properly defined."""