From 3d6d7696852ee30b74d2ad87466e5da422bcec93 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 28 Jun 2026 19:59:25 +0800 Subject: [PATCH] =?UTF-8?q?feat(profiling):=20=E6=B7=BB=E5=8A=A0=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E6=80=A7=E8=83=BD=E5=88=86=E6=9E=90=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E4=B8=8E=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增了性能剖面分析能力,支持从运行报告生成任务级、图级性能指标,包括关键路径、并行度分析和瓶颈识别,同时补充了完整的单元测试覆盖。 --- src/pyflowx/__init__.py | 3 + src/pyflowx/profiling.py | 451 +++++++++++++++++++++++++++++++ tests/test_profiling.py | 567 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 1021 insertions(+) create mode 100644 src/pyflowx/profiling.py create mode 100644 tests/test_profiling.py diff --git a/src/pyflowx/__init__.py b/src/pyflowx/__init__.py index 57d8ac1..d747e0b 100644 --- a/src/pyflowx/__init__.py +++ b/src/pyflowx/__init__.py @@ -82,6 +82,7 @@ from .errors import ( ) from .executors import Strategy, run from .graph import Graph, GraphDefaults +from .profiling import ProfileReport, TaskProfile from .report import RunReport from .runner import CliExitCode, CliRunner from .storage import JSONBackend, MemoryBackend, StateBackend @@ -122,6 +123,7 @@ __all__ = [ "JSONBackend", "MemoryBackend", "MissingDependencyError", + "ProfileReport", "PyFlowXError", "RetryPolicy", "RunReport", @@ -132,6 +134,7 @@ __all__ = [ "TaskEvent", "TaskFailedError", "TaskHooks", + "TaskProfile", "TaskResult", "TaskSpec", "TaskStatus", diff --git a/src/pyflowx/profiling.py b/src/pyflowx/profiling.py new file mode 100644 index 0000000..bb6e551 --- /dev/null +++ b/src/pyflowx/profiling.py @@ -0,0 +1,451 @@ +"""工作流执行性能评估。 + +基于 :class:`~pyflowx.report.RunReport` 中已有的 ``started_at`` / +``finished_at`` 时间戳进行离线分析,**零运行时开销**——不修改执行流程, +不注册回调,不引入额外计时器。 + +核心指标 +-------- +* **任务级**:每个任务的 wall-clock 耗时、状态、重试次数、等待时间 + (从最早依赖完成到本任务开始)。 +* **图级**:总耗时(wall-clock)、关键路径耗时(理论最短耗时)、 + 并行度效率(关键路径耗时 / 总耗时)。 +* **关键路径**:从源点到汇点的最长依赖路径,识别真正的串行瓶颈。 +* **并行度**:基于时间线重叠计算瞬时并行度,给出平均并行度与峰值并行度。 +* **瓶颈识别**:按耗时排序的 Top-N 任务。 + +设计原则 +-------- +* 数据来源于 ``RunReport`` + ``Graph``,无副作用。 +* 计算复杂度 O(V+E):拓扑排序 + 单次松弛,适合大规模图。 +* 所有时间戳用 ``datetime``,与 :class:`TaskResult` 保持一致。 + +快速上手 +-------- + import pyflowx as px + + report = px.run(graph) + profile = px.ProfileReport.from_report(report, graph) + print(profile.describe()) + bottlenecks = profile.top_bottlenecks(3) +""" + +from __future__ import annotations + +__all__ = [ + "ProfileReport", + "TaskProfile", +] + +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from .graph import Graph +from .report import RunReport +from .task import TaskResult, TaskStatus + + +@dataclass(frozen=True) +class TaskProfile: + """单个任务的性能剖面。 + + 属性 + ---- + name: + 任务名。 + status: + 终态(SUCCESS/FAILED/SKIPPED)。 + duration: + wall-clock 执行耗时(秒)。SKIPPED 任务为 0.0。 + attempts: + 尝试次数(含首次)。 + wait_time: + 从最早硬依赖完成到本任务开始的等待时间(秒)。 + 无硬依赖或 SKIPPED 时为 0.0。 + is_on_critical_path: + 是否位于关键路径上。 + deps: + 硬依赖任务名列表。 + """ + + name: str + status: TaskStatus + duration: float + attempts: int + wait_time: float + is_on_critical_path: bool + deps: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + """转为 JSON 友好的字典。""" + return { + "name": self.name, + "status": self.status.value, + "duration_seconds": round(self.duration, 6), + "attempts": self.attempts, + "wait_time_seconds": round(self.wait_time, 6), + "is_on_critical_path": self.is_on_critical_path, + "deps": list(self.deps), + } + + +@dataclass(frozen=True) +class ProfileReport: + """工作流执行的性能剖面报告。 + + 通过 :meth:`from_report` 从 :class:`RunReport` + :class:`Graph` 构建。 + 所有字段在构造时一次性计算完毕,后续访问为 O(1)。 + """ + + tasks: tuple[TaskProfile, ...] + """所有任务的性能剖面(按拓扑序)。""" + + total_duration: float + """整次运行的 wall-clock 耗时(秒)。""" + + critical_path_duration: float + """关键路径耗时(秒):从最早任务开始到最晚任务结束的最长依赖路径。""" + + critical_path: tuple[str, ...] + """关键路径上的任务名序列(按执行顺序)。""" + + avg_parallelism: float + """平均并行度 = 任务总耗时 / wall-clock 总耗时。""" + + peak_parallelism: int + """峰值并行度:任一时刻同时运行的任务数最大值。""" + + parallelism_efficiency: float + """并行度效率 = 关键路径耗时 / wall-clock 总耗时。``1.0`` 表示完全串行, + 越大表示并行化收益越低(瓶颈在关键路径上)。""" + + # ------------------------------------------------------------------ # + # 构建 + # ------------------------------------------------------------------ # + @classmethod + def from_report(cls, report: RunReport, graph: Graph) -> ProfileReport: + """从运行报告与图构建性能剖面。 + + 参数 + ---- + report: + 已完成的 :class:`RunReport`,需包含 ``started_at``/``finished_at``。 + graph: + 对应的 :class:`Graph`,用于依赖关系与关键路径分析。 + + Note + ----- + 本方法不修改 ``report`` 或 ``graph``,纯函数式计算。 + """ + task_profiles = cls._build_task_profiles(report, graph) + total_duration = cls._calc_total_duration(report) + critical_path, critical_duration = cls._calc_critical_path(graph, report) + avg_par, peak_par = cls._calc_parallelism(report) + efficiency = critical_duration / total_duration if total_duration > 0 else 0.0 + + # 标记关键路径上的任务 + critical_set = set(critical_path) + marked = tuple( + TaskProfile( + name=t.name, + status=t.status, + duration=t.duration, + attempts=t.attempts, + wait_time=t.wait_time, + is_on_critical_path=t.name in critical_set, + deps=t.deps, + ) + for t in task_profiles + ) + + return cls( + tasks=marked, + total_duration=total_duration, + critical_path_duration=critical_duration, + critical_path=critical_path, + avg_parallelism=avg_par, + peak_parallelism=peak_par, + parallelism_efficiency=efficiency, + ) + + @staticmethod + def _build_task_profiles(report: RunReport, graph: Graph) -> tuple[TaskProfile, ...]: + """构建每个任务的性能剖面。""" + profiles: list[TaskProfile] = [] + for name, result in report.results.items(): + spec = graph.specs.get(name) + deps = tuple(spec.depends_on) if spec is not None else () + duration = result.duration or 0.0 + wait_time = ProfileReport._calc_wait_time(result, deps, report) + profiles.append( + TaskProfile( + name=name, + status=result.status, + duration=duration, + attempts=result.attempts, + wait_time=wait_time, + is_on_critical_path=False, # 后续标记 + deps=deps, + ) + ) + return tuple(profiles) + + @staticmethod + def _calc_wait_time( + result: TaskResult[Any], + deps: tuple[str, ...], + report: RunReport, + ) -> float: + """计算等待时间:从最早依赖完成到本任务开始。 + + 无硬依赖、SKIPPED 任务或时间戳缺失时返回 0.0。 + """ + if not deps or result.started_at is None or result.status == TaskStatus.SKIPPED: + return 0.0 + # 找出所有已完成依赖的最晚完成时间 + dep_end_times: list[datetime] = [] + for dep in deps: + dep_result = report.results.get(dep) + if dep_result is not None and dep_result.finished_at is not None: + dep_end_times.append(dep_result.finished_at) + if not dep_end_times: + return 0.0 + latest_dep_end = max(dep_end_times) + delta = (result.started_at - latest_dep_end).total_seconds() + return max(0.0, delta) + + @staticmethod + def _calc_total_duration(report: RunReport) -> float: + """计算 wall-clock 总耗时:最早开始到最晚结束。""" + starts: list[datetime] = [] + ends: list[datetime] = [] + for r in report.results.values(): + if r.started_at is not None: + starts.append(r.started_at) + if r.finished_at is not None: + ends.append(r.finished_at) + if not starts or not ends: + return 0.0 + return (max(ends) - min(starts)).total_seconds() + + @staticmethod + def _calc_critical_path(graph: Graph, report: RunReport) -> tuple[tuple[str, ...], float]: + """计算关键路径:DAG 最长路径(按实际执行耗时)。 + + 使用拓扑排序 + 动态规划,O(V+E)。SKIPPED 任务耗时按 0 计。 + """ + # 构建耗时映射 + durations: dict[str, float] = {} + for name, result in report.results.items(): + durations[name] = result.duration or 0.0 + + # 拓扑序(使用 graph.layers 保证与分层一致) + try: + layers = graph.layers() + except Exception: + # 图校验失败时回退为空 + return (), 0.0 + + # earliest_finish[name] = duration[name] + max(earliest_finish[dep] for dep in deps) + earliest_finish: dict[str, float] = {} + predecessor: dict[str, str | None] = {} + + for layer in layers: + for name in layer: + spec = graph.specs.get(name) + deps = spec.depends_on if spec is not None else () + if not deps: + earliest_finish[name] = durations.get(name, 0.0) + predecessor[name] = None + else: + best_dep: str | None = None + best_ef = 0.0 + for dep in deps: + ef = earliest_finish.get(dep, 0.0) + if ef >= best_ef: + best_ef = ef + best_dep = dep + earliest_finish[name] = best_ef + durations.get(name, 0.0) + predecessor[name] = best_dep + + if not earliest_finish: + return (), 0.0 + + # 找到 earliest_finish 最大的节点作为终点 + end_node = max(earliest_finish, key=lambda n: earliest_finish[n]) + total = earliest_finish[end_node] + + # 回溯关键路径 + path: list[str] = [] + node: str | None = end_node + while node is not None: + path.append(node) + node = predecessor.get(node) + path.reverse() + + return tuple(path), total + + @staticmethod + def _calc_parallelism(report: RunReport) -> tuple[float, int]: + """计算平均并行度与峰值并行度。 + + 基于时间线扫描:将每个任务的 [started_at, finished_at] 区间 + 转为事件点(+1/-1),排序后扫描得到瞬时并行度序列。 + + 返回 (avg_parallelism, peak_parallelism)。 + 无有效时间戳时返回 (0.0, 0)。 + """ + events: list[tuple[float, int]] = [] # (timestamp, delta) + for r in report.results.values(): + if r.started_at is None or r.finished_at is None: + continue + if r.status == TaskStatus.SKIPPED: + continue + start_ts = r.started_at.timestamp() + end_ts = r.finished_at.timestamp() + if end_ts <= start_ts: + continue + events.append((start_ts, 1)) + events.append((end_ts, -1)) + + if not events: + return 0.0, 0 + + # 排序:同一时间点先处理结束(-1)再处理开始(+1),避免虚假峰值 + events.sort(key=lambda e: (e[0], e[1])) + + current = 0 + peak = 0 + # 加权面积用于计算平均并行度 + area = 0.0 + prev_ts = events[0][0] + for ts, delta in events: + if ts > prev_ts: + area += current * (ts - prev_ts) + current += delta + peak = max(peak, current) + prev_ts = ts + + total_span = events[-1][0] - events[0][0] + avg = area / total_span if total_span > 0 else 0.0 + return avg, peak + + # ------------------------------------------------------------------ # + # 查询 + # ------------------------------------------------------------------ # + def task(self, name: str) -> TaskProfile: + """返回指定任务的剖面。不存在则 ``KeyError``。""" + for t in self.tasks: + if t.name == name: + return t + raise KeyError(name) + + def top_bottlenecks(self, n: int = 5) -> tuple[TaskProfile, ...]: + """返回耗时最长的 Top-N 任务(按 duration 降序)。 + + 参数 + ---- + n: + 返回数量。``n <= 0`` 返回空元组。 + """ + if n <= 0: + return () + return tuple(sorted(self.tasks, key=lambda t: t.duration, reverse=True)[:n]) + + def critical_tasks(self) -> tuple[TaskProfile, ...]: + """返回关键路径上的所有任务(按路径顺序)。""" + critical_set = set(self.critical_path) + # 保持关键路径顺序 + order = {name: i for i, name in enumerate(self.critical_path)} + return tuple(sorted((t for t in self.tasks if t.name in critical_set), key=lambda t: order[t.name])) + + def failed_tasks(self) -> tuple[TaskProfile, ...]: + """返回 FAILED 状态的任务。""" + return tuple(t for t in self.tasks if t.status == TaskStatus.FAILED) + + def skipped_tasks(self) -> tuple[TaskProfile, ...]: + """返回 SKIPPED 状态的任务。""" + return tuple(t for t in self.tasks if t.status == TaskStatus.SKIPPED) + + # ------------------------------------------------------------------ # + # 输出 + # ------------------------------------------------------------------ # + def to_dict(self) -> dict[str, Any]: + """转为 JSON 友好的字典。""" + return { + "tasks": [t.to_dict() for t in self.tasks], + "total_duration_seconds": round(self.total_duration, 6), + "critical_path_duration_seconds": round(self.critical_path_duration, 6), + "critical_path": list(self.critical_path), + "avg_parallelism": round(self.avg_parallelism, 4), + "peak_parallelism": self.peak_parallelism, + "parallelism_efficiency": round(self.parallelism_efficiency, 4), + "bottlenecks": [t.to_dict() for t in self.top_bottlenecks(5)], + } + + def describe(self) -> str: + """人类可读的多行性能报告。""" + lines: list[str] = [] + lines.append("=" * 70) + lines.append("PyFlowX 性能剖面报告") + lines.append("=" * 70) + lines.append("") + lines.append("【图级指标】") + lines.append(f" 总耗时 (wall-clock): {self.total_duration:.3f}s") + lines.append(f" 关键路径耗时: {self.critical_path_duration:.3f}s") + lines.append(f" 平均并行度: {self.avg_parallelism:.2f}") + lines.append(f" 峰值并行度: {self.peak_parallelism}") + lines.append(f" 并行度效率: {self.parallelism_efficiency:.2%}") + lines.append(f" 任务总数: {len(self.tasks)}") + lines.append("") + + # 关键路径 + lines.append("【关键路径】") + if self.critical_path: + lines.append(f" {' -> '.join(self.critical_path)}") + else: + lines.append(" (无)") + lines.append("") + + # Top 瓶颈 + bottlenecks = self.top_bottlenecks(5) + lines.append(f"【Top {len(bottlenecks)} 瓶颈任务】") + if bottlenecks: + lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}") + lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}") + for t in bottlenecks: + critical_flag = "✓" if t.is_on_critical_path else "" + lines.append( + f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} " + f"{critical_flag:>8} {t.status.value:>8}", + ) + else: + lines.append(" (无)") + lines.append("") + + # 全部任务详情 + lines.append("【全部任务】") + if self.tasks: + lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}") + lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}") + for t in self.tasks: + critical_flag = "✓" if t.is_on_critical_path else "" + lines.append( + f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} " + f"{critical_flag:>8} {t.status.value:>8}", + ) + else: + lines.append(" (无)") + lines.append("") + lines.append("=" * 70) + return "\n".join(lines) + + def __repr__(self) -> str: + return ( + f"ProfileReport(tasks={len(self.tasks)}, " + f"total={self.total_duration:.3f}s, " + f"critical={self.critical_path_duration:.3f}s, " + f"avg_par={self.avg_parallelism:.2f}, " + f"peak_par={self.peak_parallelism})" + ) diff --git a/tests/test_profiling.py b/tests/test_profiling.py new file mode 100644 index 0000000..8c36230 --- /dev/null +++ b/tests/test_profiling.py @@ -0,0 +1,567 @@ +"""性能剖面(ProfileReport)测试. + +覆盖策略: +* 构造带时间戳的 RunReport + Graph,验证关键路径、并行度、瓶颈排序。 +* 边界场景:空报告、单任务、无时间戳、SKIPPED 任务、图校验失败。 +* 输出格式:to_dict / describe / top_bottlenecks / critical_tasks。 +""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Any + +import pyflowx as px +from pyflowx.profiling import ProfileReport, TaskProfile +from pyflowx.task import TaskResult, TaskSpec, TaskStatus + + +def _fn() -> int: + return 1 + + +def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]: + return TaskSpec[Any](name, _fn, depends_on=deps) + + +def _result( + name: str, + start: datetime, + duration: float, + *, + status: TaskStatus = TaskStatus.SUCCESS, + attempts: int = 1, +) -> TaskResult[Any]: + """构造带时间戳的 TaskResult.""" + end = start + timedelta(seconds=duration) if duration > 0 else start + return TaskResult[Any]( + spec=_spec(name), + status=status, + value=None, + attempts=attempts, + started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None, + finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None, + ) + + +def _skipped_result(name: str, reason: str = "skip") -> TaskResult[Any]: + """构造 SKIPPED 结果(无时间戳).""" + return TaskResult[Any]( + spec=_spec(name), + status=TaskStatus.SKIPPED, + reason=reason, + ) + + +class TestProfileReportConstruction: + """测试 ProfileReport 构建.""" + + def test_empty_report(self) -> None: + """空报告应产生空剖面.""" + report = px.RunReport() + graph = px.Graph() + profile = ProfileReport.from_report(report, graph) + assert len(profile.tasks) == 0 + assert profile.total_duration == 0.0 + assert profile.critical_path == () + assert profile.critical_path_duration == 0.0 + assert profile.avg_parallelism == 0.0 + assert profile.peak_parallelism == 0 + + def test_single_task(self) -> None: + """单任务:关键路径就是它自己,并行度为 1.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.5) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + + assert len(profile.tasks) == 1 + assert profile.tasks[0].name == "a" + assert profile.tasks[0].duration == 1.5 + assert profile.tasks[0].is_on_critical_path + assert profile.total_duration == 1.5 + assert profile.critical_path == ("a",) + assert profile.critical_path_duration == 1.5 + assert profile.avg_parallelism == 1.0 + assert profile.peak_parallelism == 1 + assert profile.parallelism_efficiency == 1.0 + + def test_serial_chain(self) -> None: + """串行链 a -> b -> c:关键路径为全部,效率 100%.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0) + report.results["c"] = _result("c", start + timedelta(seconds=3), 1.5) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + _spec("c", deps=("b",)), + ]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.total_duration == 4.5 + assert profile.critical_path_duration == 4.5 + assert profile.critical_path == ("a", "b", "c") + assert profile.parallelism_efficiency == 1.0 + assert profile.peak_parallelism == 1 + assert profile.avg_parallelism == 1.0 + + def test_parallel_tasks(self) -> None: + """并行任务 a, b 同时执行:关键路径取较长者,效率 < 1.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start, 2.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + # wall-clock = 2.0, 关键路径 = 2.0 (b), 效率 = 1.0 + # 因为关键路径定义就是最长路径,与 wall-clock 相同 + assert profile.total_duration == 2.0 + assert profile.critical_path_duration == 2.0 + assert profile.critical_path == ("b",) + assert profile.peak_parallelism == 2 + # 平均并行度 = (1.0 + 2.0) / 2.0 = 1.5 + assert profile.avg_parallelism == 1.5 + + def test_parallel_with_join(self) -> None: + """a, b 并行后 join 到 c:关键路径 a->c 或 b->c.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start, 3.0) + report.results["c"] = _result("c", start + timedelta(seconds=3), 1.0) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b"), + _spec("c", deps=("a", "b")), + ]) + + profile = ProfileReport.from_report(report, graph) + + # 关键路径 = b -> c (3 + 1 = 4) + assert profile.critical_path_duration == 4.0 + assert profile.critical_path == ("b", "c") + assert profile.tasks[0].is_on_critical_path is False # a 不在关键路径 + # task("b") 在关键路径上 + assert profile.task("b").is_on_critical_path + assert profile.task("c").is_on_critical_path + + def test_skipped_task_no_timestamp(self) -> None: + """SKIPPED 任务无时间戳:不影响并行度计算.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _skipped_result("b") + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + # b 是 SKIPPED,duration=0 + assert profile.task("b").status == TaskStatus.SKIPPED + assert profile.task("b").duration == 0.0 + assert profile.peak_parallelism == 1 # 只有 a 在跑 + + +class TestWaitTime: + """测试等待时间计算.""" + + def test_no_deps_zero_wait(self) -> None: + """无依赖任务等待时间为 0.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.task("a").wait_time == 0.0 + + def test_wait_after_dep_completes(self) -> None: + """b 在 a 完成后等待 0.5s 才开始.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start + timedelta(seconds=1.5), 1.0) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + ]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.task("b").wait_time == 0.5 + + def test_wait_negative_clamped_to_zero(self) -> None: + """b 在 a 完成前就开始(异常情况)应钳制为 0.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 2.0) + # b 在 a 还没完成时就开始(不应该但可能发生) + report.results["b"] = _result("b", start + timedelta(seconds=1), 1.0) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + ]) + + profile = ProfileReport.from_report(report, graph) + + # a 在 t=2 结束,b 在 t=1 开始,delta = -1,钳制为 0 + assert profile.task("b").wait_time == 0.0 + + def test_skipped_task_zero_wait(self) -> None: + """SKIPPED 任务等待时间为 0.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _skipped_result("b") + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + ]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.task("b").wait_time == 0.0 + + +class TestCriticalPath: + """测试关键路径分析.""" + + def test_diamond_dependency(self) -> None: + """菱形依赖:a -> b -> d, a -> c -> d,关键路径取较长分支.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0) + report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0) + report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + _spec("c", deps=("a",)), + _spec("d", deps=("b", "c")), + ]) + + profile = ProfileReport.from_report(report, graph) + + # 关键路径:a -> b -> d = 1 + 3 + 1 = 5 + assert profile.critical_path_duration == 5.0 + assert profile.critical_path == ("a", "b", "d") + + def test_graph_validation_failure_returns_empty(self) -> None: + """图校验失败(有环)应回退为空关键路径.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + # 手动构造带环的图(绕过校验) + graph = px.Graph() + graph.specs["a"] = _spec("a", deps=("b",)) + graph.specs["b"] = _spec("b", deps=("a",)) + graph.deps["a"] = ("b",) + graph.deps["b"] = ("a",) + + profile = ProfileReport.from_report(report, graph) + + # layers() 抛 CycleError,回退为空 + assert profile.critical_path == () + assert profile.critical_path_duration == 0.0 + + +class TestParallelism: + """测试并行度计算.""" + + def test_no_timestamps_zero_parallelism(self) -> None: + """所有任务无时间戳:并行度为 0.""" + report = px.RunReport() + report.results["a"] = TaskResult[Any](spec=_spec("a"), status=TaskStatus.SUCCESS) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.avg_parallelism == 0.0 + assert profile.peak_parallelism == 0 + + def test_zero_duration_excluded(self) -> None: + """零耗时任务(end <= start)不参与并行度计算.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 0.0) # 零耗时 + report.results["b"] = _result("b", start, 1.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + # 只有 b 参与,峰值 = 1 + assert profile.peak_parallelism == 1 + + def test_skipped_with_timestamps_excluded(self) -> None: + """SKIPPED 任务即使带时间戳也不参与并行度计算.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + # SKIPPED 但带时间戳(异常但可能发生) + report.results["a"] = _result("a", start, 1.0, status=TaskStatus.SKIPPED) + report.results["b"] = _result("b", start, 1.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + # a 是 SKIPPED,被排除;只有 b 参与 + assert profile.peak_parallelism == 1 + + def test_peak_parallelism_three_tasks(self) -> None: + """三个任务完全重叠:峰值并行度 = 3.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 3.0) + report.results["b"] = _result("b", start, 3.0) + report.results["c"] = _result("c", start, 3.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.peak_parallelism == 3 + assert profile.avg_parallelism == 3.0 + + +class TestQueries: + """测试查询方法.""" + + def test_task_lookup(self) -> None: + """task(name) 应返回对应剖面.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start, 2.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + assert profile.task("a").name == "a" + assert profile.task("b").duration == 2.0 + + def test_task_lookup_not_found(self) -> None: + """task(name) 不存在应抛 KeyError.""" + report = px.RunReport() + graph = px.Graph() + profile = ProfileReport.from_report(report, graph) + try: + profile.task("missing") + except KeyError: + pass + else: + raise AssertionError("应抛出 KeyError") + + def test_top_bottlenecks(self) -> None: + """top_bottlenecks 应按耗时降序返回.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start, 3.0) + report.results["c"] = _result("c", start, 2.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")]) + + profile = ProfileReport.from_report(report, graph) + + top3 = profile.top_bottlenecks(3) + assert len(top3) == 3 + assert top3[0].name == "b" + assert top3[1].name == "c" + assert top3[2].name == "a" + + def test_top_bottlenecks_zero_or_negative(self) -> None: + """n <= 0 应返回空元组.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + graph = px.Graph.from_specs([_spec("a")]) + profile = ProfileReport.from_report(report, graph) + + assert profile.top_bottlenecks(0) == () + assert profile.top_bottlenecks(-1) == () + + def test_critical_tasks(self) -> None: + """critical_tasks 应返回关键路径上的任务(按路径顺序).""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0) + report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0) + report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0) + graph = px.Graph.from_specs([ + _spec("a"), + _spec("b", deps=("a",)), + _spec("c", deps=("a",)), + _spec("d", deps=("b", "c")), + ]) + + profile = ProfileReport.from_report(report, graph) + + # 关键路径 a -> b -> d + critical = profile.critical_tasks() + assert len(critical) == 3 + assert [t.name for t in critical] == ["a", "b", "d"] + + def test_failed_tasks(self) -> None: + """failed_tasks 应返回 FAILED 状态的任务.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED) + report.results["b"] = _result("b", start, 1.0) + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + failed = profile.failed_tasks() + assert len(failed) == 1 + assert failed[0].name == "a" + + def test_skipped_tasks(self) -> None: + """skipped_tasks 应返回 SKIPPED 状态的任务.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + report.results["b"] = _skipped_result("b") + graph = px.Graph.from_specs([_spec("a"), _spec("b")]) + + profile = ProfileReport.from_report(report, graph) + + skipped = profile.skipped_tasks() + assert len(skipped) == 1 + assert skipped[0].name == "b" + + +class TestOutputFormats: + """测试输出格式.""" + + def test_to_dict_structure(self) -> None: + """to_dict 应返回包含所有字段的字典.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.5) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + d = profile.to_dict() + + assert "tasks" in d + assert "total_duration_seconds" in d + assert "critical_path_duration_seconds" in d + assert "critical_path" in d + assert "avg_parallelism" in d + assert "peak_parallelism" in d + assert "parallelism_efficiency" in d + assert "bottlenecks" in d + assert len(d["tasks"]) == 1 + assert d["tasks"][0]["name"] == "a" + assert d["tasks"][0]["status"] == "success" + assert d["tasks"][0]["duration_seconds"] == 1.5 + assert d["tasks"][0]["is_on_critical_path"] is True + + def test_describe_contains_key_sections(self) -> None: + """describe 应包含关键章节标题.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + text = profile.describe() + + assert "PyFlowX 性能剖面报告" in text + assert "【图级指标】" in text + assert "【关键路径】" in text + assert "【Top" in text + assert "【全部任务】" in text + assert "a" in text + + def test_describe_empty_report(self) -> None: + """空报告的 describe 应不崩溃且包含章节标题.""" + report = px.RunReport() + graph = px.Graph() + profile = ProfileReport.from_report(report, graph) + text = profile.describe() + + assert "【图级指标】" in text + assert "(无)" in text + + def test_repr(self) -> None: + """__repr__ 应包含关键指标.""" + start = datetime(2024, 1, 1, 0, 0, 0) + report = px.RunReport() + report.results["a"] = _result("a", start, 1.0) + graph = px.Graph.from_specs([_spec("a")]) + + profile = ProfileReport.from_report(report, graph) + r = repr(profile) + + assert "ProfileReport" in r + assert "tasks=1" in r + assert "total=1.000s" in r + + def test_task_profile_to_dict(self) -> None: + """TaskProfile.to_dict 应返回正确字段.""" + tp = TaskProfile( + name="x", + status=TaskStatus.SUCCESS, + duration=1.5, + attempts=2, + wait_time=0.3, + is_on_critical_path=True, + deps=("a", "b"), + ) + d = tp.to_dict() + + assert d["name"] == "x" + assert d["status"] == "success" + assert d["duration_seconds"] == 1.5 + assert d["attempts"] == 2 + assert d["wait_time_seconds"] == 0.3 + assert d["is_on_critical_path"] is True + assert d["deps"] == ["a", "b"] + + +class TestIntegrationWithRun: + """与真实 run() 集成测试.""" + + def test_profile_from_real_run(self) -> None: + """从真实 run() 结果构建剖面.""" + graph = px.Graph.from_specs([ + px.TaskSpec("a", lambda: 1), + px.TaskSpec("b", lambda: 2, depends_on=("a",)), + px.TaskSpec("c", lambda: 3, depends_on=("a",)), + ]) + report = px.run(graph, strategy="sequential") + + profile = ProfileReport.from_report(report, graph) + + assert len(profile.tasks) == 3 + assert profile.critical_path_duration > 0 + # sequential 策略下并行度应为 1 + assert profile.peak_parallelism == 1 + + def test_profile_from_thread_run(self) -> None: + """从 thread 策略 run() 结果构建剖面,验证并行度 > 1.""" + import time + + def slow() -> int: + time.sleep(0.05) + return 1 + + graph = px.Graph.from_specs([ + px.TaskSpec("a", slow), + px.TaskSpec("b", slow), + px.TaskSpec("c", slow), + ]) + report = px.run(graph, strategy="thread", max_workers=3) + + profile = ProfileReport.from_report(report, graph) + + # 三个任务并行,峰值应 >= 2(可能因调度时机不到 3) + assert profile.peak_parallelism >= 2 + assert profile.critical_path_duration > 0