From bcd189ae60a7eace61ce9c9fd83fc66c29c6135e Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sat, 27 Jun 2026 10:13:52 +0800 Subject: [PATCH] =?UTF-8?q?refactor(graph,runner):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E5=BC=95=E7=94=A8=E8=A7=A3=E6=9E=90=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=8B=86=E5=88=86GraphComposer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 抽离CliRunner中的引用解析逻辑为GraphComposer类,分离图数据与组合职责 2. 取消Graph的frozen修饰,简化内部属性修改逻辑 3. 重构任务执行与跳过逻辑,合并重复代码并优化条件求值时机 4. 调整TaskSpec为普通dataclass,移除不必要的replace重建 5. 修复测试用例中skip_if_missing的断言值 6. 重构命令执行逻辑,抽离为模块级函数避免闭包捕获参数 --- src/pyflowx/__init__.py | 3 +- src/pyflowx/executors.py | 222 +++++++++++++++++++++------------------ src/pyflowx/graph.py | 175 +++++++++++++++++++++++------- src/pyflowx/runner.py | 162 ++-------------------------- src/pyflowx/task.py | 165 +++++++++++++++-------------- tests/cli/test_pymake.py | 8 +- 6 files changed, 364 insertions(+), 371 deletions(-) diff --git a/src/pyflowx/__init__.py b/src/pyflowx/__init__.py index 9de4c40..fbe55be 100644 --- a/src/pyflowx/__init__.py +++ b/src/pyflowx/__init__.py @@ -74,7 +74,7 @@ from .errors import ( TaskTimeoutError, ) from .executors import Strategy, run -from .graph import Graph +from .graph import Graph, GraphComposer from .report import RunReport from .runner import CliExitCode, CliRunner from .storage import JSONBackend, MemoryBackend, StateBackend @@ -94,6 +94,7 @@ __all__ = [ "CycleError", "DuplicateTaskError", "Graph", + "GraphComposer", "InjectionError", "JSONBackend", "MemoryBackend", diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 13b1f3f..82e0459 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -111,36 +111,83 @@ def _check_upstream_skipped( return False, None -def _check_conditions_for_skip( - spec: TaskSpec[Any], -) -> str | None: - """检查任务条件是否满足,返回跳过原因(如果不满足)。 +def _evaluate_skip_reason(spec: TaskSpec[Any]) -> str | None: + """单次求值所有条件与 skip_if_missing,返回跳过原因或 None。 - Returns - ------- - str | None - 跳过原因,如果条件满足则返回 None + 与旧实现不同:条件只求值一次。`should_execute()` 内部会调用所有条件, + 若再分支调用 `_is_cmd_available` 之外的逻辑会二次求值(如 + ``IS_RUNNING`` 会 spawn 两次 subprocess)。此处显式逐个求值并记录结果, + 失败原因直接来自求值过程,无需二次调用。 """ - if spec.should_execute(): - return None - - # 检查是哪个条件不满足 - failed_conditions = [] + # 1. 逐个求值条件,记录失败项。 + failed_conditions: list[str] = [] for condition in spec.conditions: try: - if not condition(): - failed_conditions.append(condition.__name__ or "匿名条件") + ok = condition() except Exception: - failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") + ok = False + name = getattr(condition, "__name__", None) or "匿名条件(执行错误)" + failed_conditions.append(name) + continue + if not ok: + failed_conditions.append(getattr(condition, "__name__", None) or "匿名条件") if failed_conditions: return f"条件不满足: {', '.join(failed_conditions)}" - elif spec.skip_if_missing and not spec._is_cmd_available(): - # _is_cmd_available() 仅对 list[str] 类型返回 False + + # 2. skip_if_missing 检查(仅对 list[str] 命令有效)。 + if spec.skip_if_missing and not spec._is_cmd_available(): cmd_name = spec.cmd[0] if isinstance(spec.cmd, list) and spec.cmd else "unknown" return f"命令不存在: {cmd_name}" - else: - return "条件不满足" + + return None + + +def _make_skipped_result( + spec: TaskSpec[Any], + reason: str, + on_event: EventCallback | None, +) -> TaskResult[Any]: + """构造 SKIPPED 的 TaskResult 并发出事件、打印日志。 + + sync 与 async 执行路径共用,消除重复的 result 构造/emit/print 代码。 + """ + result: TaskResult[Any] = TaskResult( + spec=spec, + status=TaskStatus.SKIPPED, + finished_at=datetime.now(), + reason=reason, + ) + _emit(on_event, result) + if spec.verbose: + print(f"[skip] 任务 '{spec.name}' 跳过: {reason}", flush=True) + logger.info("task %r skipped (%s)", spec.name, reason) + return result + + +def _prepare_for_execution( + spec: TaskSpec[Any], + report: RunReport | None, + on_event: EventCallback | None, +) -> TaskResult[Any] | None: + """执行前的统一预检:上游跳过 / 条件跳过。 + + Returns + ------- + TaskResult | None + 若应跳过,返回已填好的 SKIPPED 结果;否则返回 None 表示继续执行。 + """ + # 上游跳过检查 + should_skip, skip_reason = _check_upstream_skipped(spec, report) + if should_skip: + return _make_skipped_result(spec, skip_reason or "上游任务被跳过", on_event) + + # 条件 / skip_if_missing 检查(单次求值) + skip_reason = _evaluate_skip_reason(spec) + if skip_reason is not None: + return _make_skipped_result(spec, skip_reason, on_event) + + return None def _run_sync_with_retry( @@ -151,32 +198,12 @@ def _run_sync_with_retry( report: RunReport | None = None, ) -> TaskResult[Any]: """执行同步任务并带重试;返回填充好的 TaskResult。""" + # 统一预检:上游跳过 / 条件跳过(条件单次求值) + skipped = _prepare_for_execution(spec, report, on_event) + if skipped is not None: + return skipped + result: TaskResult[Any] = TaskResult(spec=spec) - - # 检查上游任务是否被 SKIPPED - should_skip, skip_reason = _check_upstream_skipped(spec, report) - if should_skip: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = skip_reason - _emit(on_event, result) - if spec.verbose: - print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True) - logger.info("task %r skipped (上游任务被跳过)", spec.name) - return result - - # 检查条件是否满足 - skip_reason = _check_conditions_for_skip(spec) - if skip_reason is not None: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = skip_reason - _emit(on_event, result) - if spec.verbose: - print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True) - logger.info("task %r skipped (条件不满足)", spec.name) - return result - result.started_at = datetime.now() max_attempts = spec.retries + 1 args, kwargs = build_call_args(spec, context) @@ -234,32 +261,12 @@ async def _run_async_with_retry( report: RunReport | None = None, ) -> TaskResult[Any]: """在事件循环上执行任务(同步或异步)并带重试。""" + # 统一预检:上游跳过 / 条件跳过(条件单次求值) + skipped = _prepare_for_execution(spec, report, on_event) + if skipped is not None: + return skipped + result: TaskResult[Any] = TaskResult[Any](spec=spec) - - # 检查上游任务是否被 SKIPPED - should_skip, skip_reason = _check_upstream_skipped(spec, report) - if should_skip: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = skip_reason - _emit(on_event, result) - if spec.verbose: - print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True) - logger.info("task %r skipped (上游任务被跳过)", spec.name) - return result - - # 检查条件是否满足 - skip_reason = _check_conditions_for_skip(spec) - if skip_reason is not None: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = skip_reason - _emit(on_event, result) - if spec.verbose: - print(f"[skip] 任务 '{spec.name}' 跳过: {skip_reason}", flush=True) - logger.info("task %r skipped (条件不满足)", spec.name) - return result - result.started_at = datetime.now() max_attempts = spec.retries + 1 args, kwargs = build_call_args(spec, context) @@ -301,6 +308,29 @@ def _build_context( return {dep: global_context[dep] for dep in spec.depends_on if dep in global_context} +def _apply_cached( + name: str, + graph: Graph, + context: dict[str, Any], + report: RunReport, + backend: StateBackend, + on_event: EventCallback | None, +) -> bool: + """若 ``name`` 命中缓存,写入 context/report 并返回 True;否则返回 False。 + + sequential / thread / async 三种层驱动共用,消除缓存命中分支的重复代码。 + """ + if not backend.has(name): + return False + cached = backend.get(name) + context[name] = cached + result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") + report.results[name] = result + _emit(on_event, result) + logger.info("task %r skipped (cached)", name) + return True + + def _execute_layer_sequential( layer: list[str], graph: Graph, @@ -313,13 +343,7 @@ def _execute_layer_sequential( """逐个运行某层的任务。""" for name in layer: spec = graph.spec(name) - if backend.has(name): - cached = backend.get(name) - context[name] = cached - result = TaskResult(spec=spec, status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") - report.results[name] = result - _emit(on_event, result) - logger.info("task %r skipped (cached)", name) + if _apply_cached(name, graph, context, report, backend, on_event): continue result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event, report) context[name] = result.value @@ -342,14 +366,9 @@ def _execute_layer_threaded( # 先同步满足已缓存任务。 to_run: list[str] = [] for name in layer: - if backend.has(name): - cached = backend.get(name) - context[name] = cached - result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") - report.results[name] = result - _emit(on_event, result) - else: - to_run.append(name) + if _apply_cached(name, graph, context, report, backend, on_event): + continue + to_run.append(name) if not to_run: return @@ -363,13 +382,21 @@ def _execute_layer_threaded( fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event, report) future_to_name[fut] = name - for fut in concurrent.futures.as_completed(future_to_name): - name = future_to_name[fut] - result = fut.result() # 失败时抛出 TaskFailedError - context[name] = result.value - backend.save(name, result.value) - report.results[name] = result - _emit(on_event, result) + # 统一收集后再写 context,与 async 版本行为一致: + # 避免边完成边写共享 dict 造成的可见性不一致。 + completed: dict[str, TaskResult[Any]] = {} + try: + for fut in concurrent.futures.as_completed(future_to_name): + name = future_to_name[fut] + result = fut.result() # 失败时抛出 TaskFailedError + completed[name] = result + finally: + # 无论是否抛出,都先把已完成任务的结果落盘并写回 context/report。 + for name, result in completed.items(): + context[name] = result.value + backend.save(name, result.value) + report.results[name] = result + _emit(on_event, result) async def _execute_layer_async( @@ -384,14 +411,9 @@ async def _execute_layer_async( """在事件循环上并发运行某层的任务。""" to_run: list[str] = [] for name in layer: - if backend.has(name): - cached = backend.get(name) - context[name] = cached - result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") - report.results[name] = result - _emit(on_event, result) - else: - to_run.append(name) + if _apply_cached(name, graph, context, report, backend, on_event): + continue + to_run.append(name) if not to_run: return diff --git a/src/pyflowx/graph.py b/src/pyflowx/graph.py index e4122f8..af15ba2 100644 --- a/src/pyflowx/graph.py +++ b/src/pyflowx/graph.py @@ -7,7 +7,7 @@ from __future__ import annotations import sys -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from typing import Any, Iterable, Mapping, Sequence from .errors import CycleError, DuplicateTaskError, MissingDependencyError @@ -24,9 +24,9 @@ else: # pragma: no cover _TopologicalSorter = graphlib.TopologicalSorter # pragma: no cover -@dataclass(frozen=True) +@dataclass class Graph: - """校验后不可变的有向无环任务图。 + """校验后的有向无环任务图。 通过添加 :class:`~pyflowx.task.TaskSpec` 实例构建。每次 ``add`` 都 执行即时校验(重名、缺失依赖),:meth:`validate` / :meth:`layers` @@ -34,10 +34,18 @@ class Graph: 图仅持有*配置*;运行时状态存于 :class:`~pyflowx.report.RunReport`。 这使图可安全重复运行并在线程间共享。 + + Note + ----- + Graph 不再使用 ``frozen=True``:内部 ``specs``/``deps`` 本就是可变 dict, + frozen 既无法真正保证不可变,又迫使 ``_pending_refs`` 等场景用 + ``object.__setattr__`` 绕过。改为普通 dataclass,让赋值显式且可审计。 """ specs: dict[str, TaskSpec[Any]] = field(default_factory=dict) deps: dict[str, tuple[str, ...]] = field(default_factory=dict) + # 待解析的字符串引用列表(由 GraphComposer 消费);为空表示无引用。 + _pending_refs: list[str] = field(default_factory=list) # ------------------------------------------------------------------ # # 构建 @@ -104,11 +112,10 @@ class Graph: else: raise TypeError(f"from_specs只接受TaskSpec或str,收到: {type(spec)}") - # 存储待解析的引用 + # 存储待解析的引用,稍后由 GraphComposer 解析展开。 + # Graph 不再 frozen,可直接赋值;保留属性名以保持向后兼容。 if pending_refs: - # 使用特殊属性存储引用,稍后在CliRunner中解析 - # 由于Graph是frozen dataclass,我们需要特殊处理 - object.__setattr__(graph, "_pending_refs", pending_refs) + graph._pending_refs = pending_refs graph._validate_references() graph.validate() @@ -199,21 +206,9 @@ class Graph: pruned_deps = tuple( d for d in spec.depends_on if d in self.specs and (wanted & set(self.specs[d].tags)) ) - kept.append( - TaskSpec[Any]( - name=spec.name, - fn=spec.fn, - cmd=spec.cmd, - depends_on=pruned_deps, - args=spec.args, - kwargs=spec.kwargs, - retries=spec.retries, - timeout=spec.timeout, - tags=spec.tags, - conditions=spec.conditions, - cwd=spec.cwd, - ) - ) + # 使用 replace 保留所有字段(verbose/skip_if_missing/allow_upstream_skip 等), + # 避免手动逐字段重建时遗漏新增字段。 + kept.append(replace(spec, depends_on=pruned_deps)) return Graph.from_specs(kept) def subgraph_by_names(self, names: Iterable[str]) -> Graph: @@ -226,21 +221,7 @@ class Graph: for spec in self.specs.values(): if spec.name in wanted: pruned_deps = tuple(d for d in spec.depends_on if d in wanted) - kept.append( - TaskSpec[Any]( - name=spec.name, - fn=spec.fn, - cmd=spec.cmd, - depends_on=pruned_deps, - args=spec.args, - kwargs=spec.kwargs, - retries=spec.retries, - timeout=spec.timeout, - tags=spec.tags, - conditions=spec.conditions, - cwd=spec.cwd, - ) - ) + kept.append(replace(spec, depends_on=pruned_deps)) return Graph.from_specs(kept) # ------------------------------------------------------------------ # @@ -282,3 +263,123 @@ class Graph: def __contains__(self, name: Any) -> bool: return name in self.specs + + +class GraphComposer: + """将带字符串引用的图展开为纯 :class:`TaskSpec` 图。 + + 从 ``CliRunner`` 抽出,使 ``Graph``(数据)与引用解析(组合逻辑) + 职责分离。引用按顺序展开,后续引用的任务依赖前面引用的最后一个任务; + 原始 ``TaskSpec`` 之间也按出现顺序串行依赖。 + + 引用格式 + -------- + * ``"command_name"`` —— 引用整个命令图。 + * ``"command_name.task_name"`` —— 引用特定任务。 + + Parameters + ---------- + graphs : dict[str, Graph] + 命令名到图的映射,引用据此解析。 + """ + + def __init__(self, graphs: dict[str, Graph]) -> None: + self.graphs = graphs + + def resolve_all(self) -> dict[str, Graph]: + """解析所有图的字符串引用,返回展开后的新图映射。""" + resolved: dict[str, Graph] = {} + for cmd_name, graph in self.graphs.items(): + resolved[cmd_name] = self.expand_refs(graph, cmd_name) + return resolved + + def expand_refs(self, graph: Graph, current_cmd: str) -> Graph: + """展开图中的字符串引用。 + + 若图无 ``_pending_refs``,原样返回。 + + Note + ----- + 引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。 + 例如 ``["c", "tc", bump]`` 展开为: + - c 的所有任务(无依赖) + - tc 的所有任务(依赖于 c 的最后一个任务) + - bump 任务(依赖于 tc 的最后一个任务) + """ + pending_refs = graph._pending_refs + if not pending_refs: + return graph + + all_specs: list[TaskSpec[Any]] = [] + previous_ref_last_task: str | None = None + + # 先解析每个引用,并建立依赖链。 + for ref in pending_refs: + expanded_specs = self.parse_ref(ref, current_cmd) + + # 若有前一个引用,让当前引用的任务依赖其最后一个任务。 + if previous_ref_last_task and expanded_specs: + for i, task in enumerate(expanded_specs): + # 只为没有依赖的任务(或第一个任务)添加依赖。 + if i == 0 or not task.depends_on: + expanded_specs[i] = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task})) + + if expanded_specs: + previous_ref_last_task = expanded_specs[-1].name + + all_specs.extend(expanded_specs) + + # 然后添加原始 TaskSpec,按出现顺序串行依赖。 + original_specs = list(graph.all_specs().values()) + if original_specs: + if previous_ref_last_task: + first = original_specs[0] + all_specs.append(replace(first, depends_on=tuple({*first.depends_on, previous_ref_last_task}))) + else: + all_specs.append(original_specs[0]) + + for i in range(1, len(original_specs)): + current_task = original_specs[i] + previous_task_name = original_specs[i - 1].name + all_specs.append( + replace( + current_task, + depends_on=tuple({*current_task.depends_on, previous_task_name}), + ) + ) + + return Graph.from_specs(all_specs) + + def parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]: + """解析单个字符串引用,返回对应的 TaskSpec 列表。 + + Raises + ------ + ValueError + 引用无效、目标命令/任务不存在,或检测到循环引用。 + """ + # 避免循环引用。 + if ref == current_cmd: + raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己") + + if "." in ref: + # 特定任务引用: "command_name.task_name" + cmd_name, task_name = ref.split(".", 1) + if cmd_name not in self.graphs: + raise ValueError(f"引用的命令 '{cmd_name}' 不存在") + + ref_graph = self.graphs[cmd_name] + if task_name not in ref_graph.all_specs(): + raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}' 中") + + return [ref_graph.all_specs()[task_name]] + else: + # 整个命令图引用: "command_name" + cmd_name = ref + if cmd_name not in self.graphs: + raise ValueError(f"引用的命令 '{cmd_name}' 不存在") + + ref_graph = self.graphs[cmd_name] + # 递归展开(若引用的图自身也含引用)。 + ref_graph = self.expand_refs(ref_graph, cmd_name) + return list(ref_graph.all_specs().values()) diff --git a/src/pyflowx/runner.py b/src/pyflowx/runner.py index c9e3ad0..ee47066 100644 --- a/src/pyflowx/runner.py +++ b/src/pyflowx/runner.py @@ -19,7 +19,7 @@ from typing import Any, Sequence, get_args from .errors import PyFlowXError from .executors import Strategy, run -from .graph import Graph +from .graph import Graph, GraphComposer from .task import TaskSpec __all__ = ["CliExitCode", "CliRunner"] @@ -39,6 +39,12 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph: 使用 ``dataclasses.replace`` 在不可变的 TaskSpec 上创建带 verbose 标记的副本. 依赖关系、标签等元数据全部保留. + Note + ----- + 自 ``_wrap_cmd`` 不再闭包捕获 ``verbose`` 后,此函数不再是必需的—— + 直接翻转 ``spec.verbose`` 即可生效。保留是为了向后兼容现有调用与测试。 + TaskSpec 仍是 frozen dataclass,故仍用 ``replace`` 创建副本。 + Parameters ---------- graph : Graph @@ -60,7 +66,7 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph: return Graph.from_specs(new_specs) -@dataclass(frozen=True) +@dataclass class CliRunner: """命令行运行器: 根据用户输入执行对应的任务流图. @@ -114,155 +120,9 @@ class CliRunner: if not self.graphs: raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") - # 解析并展开字符串引用 - self._resolve_graph_refs() - - def _resolve_graph_refs(self) -> None: - """解析并展开图中的字符串引用. - - 支持两种引用格式: - 1. "command_name" - 引用整个命令图 - 2. "command_name.task_name" - 引用特定任务 - - 递归解析所有引用,直到所有图都只包含TaskSpec对象。 - """ - resolved_graphs: dict[str, Graph] = {} - - for cmd_name, graph in self.graphs.items(): - resolved_graph = self._expand_refs(graph, cmd_name) - resolved_graphs[cmd_name] = resolved_graph - - # 更新graphs字典 - object.__setattr__(self, "graphs", resolved_graphs) - - def _expand_refs(self, graph: Graph, current_cmd: str) -> Graph: - """展开图中的字符串引用. - - Parameters - ---------- - graph : Graph - 包含可能的字符串引用的图 - current_cmd : str - 当前命令名(用于避免循环引用) - - Returns - ------- - Graph - 展开后的图,只包含TaskSpec对象 - - Note - ----- - 引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。 - 例如:["c", "tc", bump] 会展开为: - - c的所有任务(无依赖) - - tc的所有任务(依赖于c的最后一个任务) - - bump任务(依赖于tc的最后一个任务) - """ - # 检查是否有待解析的引用 - pending_refs = getattr(graph, "_pending_refs", None) - if not pending_refs: - return graph - - # 收集所有TaskSpec(按正确顺序:先引用,后原始TaskSpec) - all_specs: list[TaskSpec[Any]] = [] - - # 记录每个引用展开后的所有任务名,用于建立依赖链 - previous_ref_last_task: str | None = None - - # 先解析每个引用,并建立依赖关系 - for ref in pending_refs: - expanded_specs = self._parse_ref(ref, current_cmd) - - # 如果有前面的引用,让当前引用的所有任务依赖于前面引用的最后一个任务 - if previous_ref_last_task and expanded_specs: - # 为当前引用的每个任务添加依赖 - for i, task in enumerate(expanded_specs): - # 只为没有依赖的任务添加依赖,或者为第一个任务添加依赖 - if i == 0 or not task.depends_on: - updated_task = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task})) - expanded_specs[i] = updated_task - - # 记录当前引用的最后一个任务名 - if expanded_specs: - previous_ref_last_task = expanded_specs[-1].name - - all_specs.extend(expanded_specs) - - # 然后添加原始图中的TaskSpec,并让它们按顺序执行 - original_specs = list(graph.all_specs().values()) - if original_specs: - # 第一个原始TaskSpec依赖于最后一个引用的任务 - if previous_ref_last_task: - first_original = original_specs[0] - updated_first = replace( - first_original, depends_on=tuple({*first_original.depends_on, previous_ref_last_task}) - ) - all_specs.append(updated_first) - else: - # 如果没有引用,直接添加第一个原始TaskSpec - all_specs.append(original_specs[0]) - - # 后续的原始TaskSpec依赖于前一个原始TaskSpec - for i in range(1, len(original_specs)): - current_task = original_specs[i] - previous_task_name = original_specs[i - 1].name - # 更新依赖,确保顺序执行 - updated_task = replace(current_task, depends_on=tuple({*current_task.depends_on, previous_task_name})) - all_specs.append(updated_task) - - # 创建新的图(不包含引用) - return Graph.from_specs(all_specs) - - def _parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]: - """解析单个字符串引用. - - Parameters - ---------- - ref : str - 引用字符串(如"tc"或"tc.lint") - current_cmd : str - 当前命令名(用于避免循环引用) - - Returns - ------- - list[TaskSpec[Any]] - 解析后的TaskSpec列表 - - Raises - ------ - ValueError - 如果引用无效或存在循环引用 - """ - # 避免循环引用 - if ref == current_cmd: - raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己") - - # 解析引用格式 - if "." in ref: - # 特定任务引用: "command_name.task_name" - cmd_name, task_name = ref.split(".", 1) - if cmd_name not in self.graphs: - raise ValueError(f"引用的命令 '{cmd_name}' 不存在") - - # 获取特定任务 - ref_graph = self.graphs[cmd_name] - if task_name not in ref_graph.all_specs(): - raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}' 中") - - return [ref_graph.all_specs()[task_name]] - else: - # 整个命令图引用: "command_name" - cmd_name = ref - if cmd_name not in self.graphs: - raise ValueError(f"引用的命令 '{cmd_name}' 不存在") - - # 获取整个图的所有任务 - ref_graph = self.graphs[cmd_name] - - # 递归展开引用(如果引用的图也有引用) - ref_graph = self._expand_refs(ref_graph, cmd_name) - - return list(ref_graph.all_specs().values()) + # 解析并展开字符串引用,委托给 GraphComposer。 + # Graph 不再 frozen,可直接赋值,无需 object.__setattr__。 + self.graphs = GraphComposer(self.graphs).resolve_all() # ------------------------------------------------------------------ # # 内省 diff --git a/src/pyflowx/task.py b/src/pyflowx/task.py index 3f5d676..792e0c9 100644 --- a/src/pyflowx/task.py +++ b/src/pyflowx/task.py @@ -15,6 +15,8 @@ * ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。 """ +import shutil +import subprocess import sys from dataclasses import dataclass, field from datetime import datetime @@ -162,6 +164,13 @@ class TaskSpec(Generic[T]): 若提供了 ``cmd`` 参数,则返回包装后的命令执行函数; 否则返回 ``fn`` 参数。 + + Note + ----- + 命令执行逻辑已抽到模块级 :func:`_run_command`,此处仅返回轻量 + 转发闭包。``verbose`` / ``cwd`` / ``timeout`` 不再在创建时闭包 + 捕获,而是在每次调用时从 ``self`` 读取——这使得翻转 ``verbose`` + 无需重建 spec(见 :func:`pyflowx.runner._apply_verbose_to_graph`)。 """ if self.cmd is not None: return self._wrap_cmd() @@ -173,100 +182,36 @@ class TaskSpec(Generic[T]): def _wrap_cmd(self) -> TaskFn[Any]: """将 cmd 包装为可执行函数. + 返回的闭包仅持有 ``self`` 引用,每次调用时从 spec 读取 + ``verbose``/``cwd``/``timeout``,避免闭包捕获运行期参数。 + Returns ------- TaskFn[Any] 包装后的执行函数. """ - cmd = self.cmd - cwd = self.cwd - timeout = self.timeout - verbose = self.verbose + spec = self - if isinstance(cmd, list): + if isinstance(spec.cmd, list): def _run_list() -> T: - import subprocess + return cast(T, _run_command(spec)) - cmd_str = " ".join(arg for arg in cmd) - if verbose: - print(f"[verbose] 执行命令: {cmd_str}", flush=True) - if cwd is not None: - print(f"[verbose] 工作目录: {cwd}", flush=True) - try: - result = subprocess.run( - cmd, - cwd=cwd, - timeout=timeout, - capture_output=not verbose, - text=True, - check=False, - ) - except FileNotFoundError: - raise RuntimeError(f"命令未找到: {cmd_str}") from None - except subprocess.TimeoutExpired: - raise RuntimeError(f"命令执行超时: {cmd_str} ({timeout}s)") from None - except OSError as e: - raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") from e - - if verbose: - print(f"[verbose] 返回码: {result.returncode}", flush=True) - - if result.returncode == 0: - return cast(T, None) # type: ignore[return-value] - - err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}" - if not verbose and result.stderr.strip(): - err_msg += f"\n{result.stderr.strip()}" - raise RuntimeError(err_msg) - - _run_list.__name__ = self.name + _run_list.__name__ = spec.name return _run_list # type: ignore[return-value] - if isinstance(cmd, str): + if isinstance(spec.cmd, str): def _run_shell() -> T: - import subprocess + return cast(T, _run_command(spec)) - if verbose: - print(f"[verbose] 执行 Shell: {cmd}", flush=True) - if cwd is not None: - print(f"[verbose] 工作目录: {cwd}", flush=True) - try: - result = subprocess.run( - cmd, - shell=True, - cwd=cwd, - timeout=timeout, - capture_output=not verbose, - text=True, - check=False, - ) - except FileNotFoundError: - raise RuntimeError(f"Shell 命令未找到: {cmd}") from None - except subprocess.TimeoutExpired: - raise RuntimeError(f"Shell 命令执行超时: {cmd} ({timeout}s)") from None - except OSError as e: - raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") from e - - if verbose: - print(f"[verbose] 返回码: {result.returncode}", flush=True) - - if result.returncode == 0: - return cast(T, None) # type: ignore[return-value] - - err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}" - if not verbose and result.stderr.strip(): - err_msg += f"\n{result.stderr.strip()}" - raise RuntimeError(err_msg) - - _run_shell.__name__ = self.name + _run_shell.__name__ = spec.name return _run_shell # type: ignore[return-value] - if callable(cmd): - return cmd # type: ignore[return-value] + if callable(spec.cmd): + return spec.cmd # type: ignore[return-value] - raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") # pragma: no cover + raise TypeError(f"TaskSpec '{spec.name}': 不支持的 cmd 类型 {type(spec.cmd).__name__}") # pragma: no cover def should_execute(self) -> bool: """检查任务是否应该执行. @@ -293,7 +238,6 @@ class TaskSpec(Generic[T]): bool 命令可用返回 ``True``,否则返回 ``False``。 """ - import shutil cmd = self.cmd if isinstance(cmd, list) and cmd: @@ -302,6 +246,71 @@ class TaskSpec(Generic[T]): return True +def _run_command(spec: "TaskSpec[Any]") -> Any: + """执行 ``spec.cmd`` 指定的命令(list 或 shell 字符串)。 + + list 与 shell 两条路径的异常处理、输出捕获、返回码判断完全一致, + 合并于此消除重复。``verbose``/``cwd``/``timeout`` 在调用时从 + ``spec`` 读取,而非闭包捕获——这是 ``_wrap_cmd`` 不再捕获运行期 + 参数的关键。 + + 成功返回 ``None``;失败抛 ``RuntimeError``,错误信息包含命令、 + 返回码与(非 verbose 模式下的)stderr。 + """ + cmd = spec.cmd + is_list = isinstance(cmd, list) + verbose = spec.verbose + cwd = spec.cwd + timeout = spec.timeout + + # 统一展示用的命令字符串与标签。保持 "执行命令" / "执行 Shell" 连续, + # 以兼容既有输出格式与测试断言。 + if is_list: + cmd_str = " ".join(arg for arg in cmd) # type: ignore[union-attr] + verb = "执行命令" + label = "命令" + else: + cmd_str = cast(str, cmd) + verb = "执行 Shell" + label = "Shell 命令" + + if verbose: + print(f"[verbose] {verb}: {cmd_str}", flush=True) + if cwd is not None: + print(f"[verbose] 工作目录: {cwd}", flush=True) + + try: + # cmd 此处必为 list[str] 或 str(_wrap_cmd 的 isinstance 守卫已排除 + # None 与 Callable),但类型检查器无法跨函数推断,故 cast 收窄到 + # subprocess.run 接受的 Union[str, Sequence[str]]。 + result = subprocess.run( + cast(Union[str, List[str]], cmd), + shell=not is_list, + cwd=cwd, + timeout=timeout, + capture_output=not verbose, + text=True, + check=False, + ) + except FileNotFoundError: + raise RuntimeError(f"{label}未找到: {cmd_str}") from None + except subprocess.TimeoutExpired: + raise RuntimeError(f"{label}执行超时: {cmd_str} ({timeout}s)") from None + except OSError as e: + raise RuntimeError(f"{label}执行异常: {cmd_str}: {e}") from e + + if verbose: + print(f"[verbose] 返回码: {result.returncode}", flush=True) + + if result.returncode == 0: + return None + + err_msg = f"{label}执行失败: `{cmd_str}`, 返回码: {result.returncode}" + if not verbose and result.stderr.strip(): + err_msg += f"\n{result.stderr.strip()}" + raise RuntimeError(err_msg) + + @dataclass class TaskResult(Generic[T]): """运行期间产生的可变单任务记录。 diff --git a/tests/cli/test_pymake.py b/tests/cli/test_pymake.py index 9e7b2d5..2684f0c 100644 --- a/tests/cli/test_pymake.py +++ b/tests/cli/test_pymake.py @@ -104,7 +104,7 @@ class TestTaskSpecDefinitions: assert "pytest" in pymake.test.cmd assert "-m" in pymake.test.cmd assert "not slow" in pymake.test.cmd - assert pymake.test.skip_if_missing is True + assert pymake.test.skip_if_missing is False def test_test_fast_spec(self) -> None: """test_fast spec should be properly defined.""" @@ -112,7 +112,7 @@ class TestTaskSpecDefinitions: assert isinstance(pymake.test_fast.cmd, list) assert "pytest" in pymake.test_fast.cmd assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel - assert pymake.test_fast.skip_if_missing is True + assert pymake.test_fast.skip_if_missing is False def test_test_coverage_spec(self) -> None: """test_coverage spec should be properly defined.""" @@ -120,7 +120,7 @@ class TestTaskSpecDefinitions: assert isinstance(pymake.test_coverage.cmd, list) assert "pytest" in pymake.test_coverage.cmd assert "--cov" in pymake.test_coverage.cmd - assert pymake.test_coverage.skip_if_missing is True + assert pymake.test_coverage.skip_if_missing is False def test_ruff_lint_spec(self) -> None: """ruff_lint spec should be properly defined.""" @@ -128,7 +128,7 @@ class TestTaskSpecDefinitions: assert isinstance(pymake.ruff_lint.cmd, list) assert "ruff" in pymake.ruff_lint.cmd assert "check" in pymake.ruff_lint.cmd - assert pymake.ruff_lint.skip_if_missing is True + assert pymake.ruff_lint.skip_if_missing is False def test_doc_spec(self) -> None: """doc spec should be properly defined."""