feat(profiling): 添加工作流性能分析模块与测试用例
新增了性能剖面分析能力,支持从运行报告生成任务级、图级性能指标,包括关键路径、并行度分析和瓶颈识别,同时补充了完整的单元测试覆盖。
This commit is contained in:
@@ -82,6 +82,7 @@ from .errors import (
|
||||
)
|
||||
from .executors import Strategy, run
|
||||
from .graph import Graph, GraphDefaults
|
||||
from .profiling import ProfileReport, TaskProfile
|
||||
from .report import RunReport
|
||||
from .runner import CliExitCode, CliRunner
|
||||
from .storage import JSONBackend, MemoryBackend, StateBackend
|
||||
@@ -122,6 +123,7 @@ __all__ = [
|
||||
"JSONBackend",
|
||||
"MemoryBackend",
|
||||
"MissingDependencyError",
|
||||
"ProfileReport",
|
||||
"PyFlowXError",
|
||||
"RetryPolicy",
|
||||
"RunReport",
|
||||
@@ -132,6 +134,7 @@ __all__ = [
|
||||
"TaskEvent",
|
||||
"TaskFailedError",
|
||||
"TaskHooks",
|
||||
"TaskProfile",
|
||||
"TaskResult",
|
||||
"TaskSpec",
|
||||
"TaskStatus",
|
||||
|
||||
@@ -0,0 +1,451 @@
|
||||
"""工作流执行性能评估。
|
||||
|
||||
基于 :class:`~pyflowx.report.RunReport` 中已有的 ``started_at`` /
|
||||
``finished_at`` 时间戳进行离线分析,**零运行时开销**——不修改执行流程,
|
||||
不注册回调,不引入额外计时器。
|
||||
|
||||
核心指标
|
||||
--------
|
||||
* **任务级**:每个任务的 wall-clock 耗时、状态、重试次数、等待时间
|
||||
(从最早依赖完成到本任务开始)。
|
||||
* **图级**:总耗时(wall-clock)、关键路径耗时(理论最短耗时)、
|
||||
并行度效率(关键路径耗时 / 总耗时)。
|
||||
* **关键路径**:从源点到汇点的最长依赖路径,识别真正的串行瓶颈。
|
||||
* **并行度**:基于时间线重叠计算瞬时并行度,给出平均并行度与峰值并行度。
|
||||
* **瓶颈识别**:按耗时排序的 Top-N 任务。
|
||||
|
||||
设计原则
|
||||
--------
|
||||
* 数据来源于 ``RunReport`` + ``Graph``,无副作用。
|
||||
* 计算复杂度 O(V+E):拓扑排序 + 单次松弛,适合大规模图。
|
||||
* 所有时间戳用 ``datetime``,与 :class:`TaskResult` 保持一致。
|
||||
|
||||
快速上手
|
||||
--------
|
||||
import pyflowx as px
|
||||
|
||||
report = px.run(graph)
|
||||
profile = px.ProfileReport.from_report(report, graph)
|
||||
print(profile.describe())
|
||||
bottlenecks = profile.top_bottlenecks(3)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
__all__ = [
|
||||
"ProfileReport",
|
||||
"TaskProfile",
|
||||
]
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from .graph import Graph
|
||||
from .report import RunReport
|
||||
from .task import TaskResult, TaskStatus
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TaskProfile:
|
||||
"""单个任务的性能剖面。
|
||||
|
||||
属性
|
||||
----
|
||||
name:
|
||||
任务名。
|
||||
status:
|
||||
终态(SUCCESS/FAILED/SKIPPED)。
|
||||
duration:
|
||||
wall-clock 执行耗时(秒)。SKIPPED 任务为 0.0。
|
||||
attempts:
|
||||
尝试次数(含首次)。
|
||||
wait_time:
|
||||
从最早硬依赖完成到本任务开始的等待时间(秒)。
|
||||
无硬依赖或 SKIPPED 时为 0.0。
|
||||
is_on_critical_path:
|
||||
是否位于关键路径上。
|
||||
deps:
|
||||
硬依赖任务名列表。
|
||||
"""
|
||||
|
||||
name: str
|
||||
status: TaskStatus
|
||||
duration: float
|
||||
attempts: int
|
||||
wait_time: float
|
||||
is_on_critical_path: bool
|
||||
deps: tuple[str, ...]
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""转为 JSON 友好的字典。"""
|
||||
return {
|
||||
"name": self.name,
|
||||
"status": self.status.value,
|
||||
"duration_seconds": round(self.duration, 6),
|
||||
"attempts": self.attempts,
|
||||
"wait_time_seconds": round(self.wait_time, 6),
|
||||
"is_on_critical_path": self.is_on_critical_path,
|
||||
"deps": list(self.deps),
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProfileReport:
|
||||
"""工作流执行的性能剖面报告。
|
||||
|
||||
通过 :meth:`from_report` 从 :class:`RunReport` + :class:`Graph` 构建。
|
||||
所有字段在构造时一次性计算完毕,后续访问为 O(1)。
|
||||
"""
|
||||
|
||||
tasks: tuple[TaskProfile, ...]
|
||||
"""所有任务的性能剖面(按拓扑序)。"""
|
||||
|
||||
total_duration: float
|
||||
"""整次运行的 wall-clock 耗时(秒)。"""
|
||||
|
||||
critical_path_duration: float
|
||||
"""关键路径耗时(秒):从最早任务开始到最晚任务结束的最长依赖路径。"""
|
||||
|
||||
critical_path: tuple[str, ...]
|
||||
"""关键路径上的任务名序列(按执行顺序)。"""
|
||||
|
||||
avg_parallelism: float
|
||||
"""平均并行度 = 任务总耗时 / wall-clock 总耗时。"""
|
||||
|
||||
peak_parallelism: int
|
||||
"""峰值并行度:任一时刻同时运行的任务数最大值。"""
|
||||
|
||||
parallelism_efficiency: float
|
||||
"""并行度效率 = 关键路径耗时 / wall-clock 总耗时。``1.0`` 表示完全串行,
|
||||
越大表示并行化收益越低(瓶颈在关键路径上)。"""
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 构建
|
||||
# ------------------------------------------------------------------ #
|
||||
@classmethod
|
||||
def from_report(cls, report: RunReport, graph: Graph) -> ProfileReport:
|
||||
"""从运行报告与图构建性能剖面。
|
||||
|
||||
参数
|
||||
----
|
||||
report:
|
||||
已完成的 :class:`RunReport`,需包含 ``started_at``/``finished_at``。
|
||||
graph:
|
||||
对应的 :class:`Graph`,用于依赖关系与关键路径分析。
|
||||
|
||||
Note
|
||||
-----
|
||||
本方法不修改 ``report`` 或 ``graph``,纯函数式计算。
|
||||
"""
|
||||
task_profiles = cls._build_task_profiles(report, graph)
|
||||
total_duration = cls._calc_total_duration(report)
|
||||
critical_path, critical_duration = cls._calc_critical_path(graph, report)
|
||||
avg_par, peak_par = cls._calc_parallelism(report)
|
||||
efficiency = critical_duration / total_duration if total_duration > 0 else 0.0
|
||||
|
||||
# 标记关键路径上的任务
|
||||
critical_set = set(critical_path)
|
||||
marked = tuple(
|
||||
TaskProfile(
|
||||
name=t.name,
|
||||
status=t.status,
|
||||
duration=t.duration,
|
||||
attempts=t.attempts,
|
||||
wait_time=t.wait_time,
|
||||
is_on_critical_path=t.name in critical_set,
|
||||
deps=t.deps,
|
||||
)
|
||||
for t in task_profiles
|
||||
)
|
||||
|
||||
return cls(
|
||||
tasks=marked,
|
||||
total_duration=total_duration,
|
||||
critical_path_duration=critical_duration,
|
||||
critical_path=critical_path,
|
||||
avg_parallelism=avg_par,
|
||||
peak_parallelism=peak_par,
|
||||
parallelism_efficiency=efficiency,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_task_profiles(report: RunReport, graph: Graph) -> tuple[TaskProfile, ...]:
|
||||
"""构建每个任务的性能剖面。"""
|
||||
profiles: list[TaskProfile] = []
|
||||
for name, result in report.results.items():
|
||||
spec = graph.specs.get(name)
|
||||
deps = tuple(spec.depends_on) if spec is not None else ()
|
||||
duration = result.duration or 0.0
|
||||
wait_time = ProfileReport._calc_wait_time(result, deps, report)
|
||||
profiles.append(
|
||||
TaskProfile(
|
||||
name=name,
|
||||
status=result.status,
|
||||
duration=duration,
|
||||
attempts=result.attempts,
|
||||
wait_time=wait_time,
|
||||
is_on_critical_path=False, # 后续标记
|
||||
deps=deps,
|
||||
)
|
||||
)
|
||||
return tuple(profiles)
|
||||
|
||||
@staticmethod
|
||||
def _calc_wait_time(
|
||||
result: TaskResult[Any],
|
||||
deps: tuple[str, ...],
|
||||
report: RunReport,
|
||||
) -> float:
|
||||
"""计算等待时间:从最早依赖完成到本任务开始。
|
||||
|
||||
无硬依赖、SKIPPED 任务或时间戳缺失时返回 0.0。
|
||||
"""
|
||||
if not deps or result.started_at is None or result.status == TaskStatus.SKIPPED:
|
||||
return 0.0
|
||||
# 找出所有已完成依赖的最晚完成时间
|
||||
dep_end_times: list[datetime] = []
|
||||
for dep in deps:
|
||||
dep_result = report.results.get(dep)
|
||||
if dep_result is not None and dep_result.finished_at is not None:
|
||||
dep_end_times.append(dep_result.finished_at)
|
||||
if not dep_end_times:
|
||||
return 0.0
|
||||
latest_dep_end = max(dep_end_times)
|
||||
delta = (result.started_at - latest_dep_end).total_seconds()
|
||||
return max(0.0, delta)
|
||||
|
||||
@staticmethod
|
||||
def _calc_total_duration(report: RunReport) -> float:
|
||||
"""计算 wall-clock 总耗时:最早开始到最晚结束。"""
|
||||
starts: list[datetime] = []
|
||||
ends: list[datetime] = []
|
||||
for r in report.results.values():
|
||||
if r.started_at is not None:
|
||||
starts.append(r.started_at)
|
||||
if r.finished_at is not None:
|
||||
ends.append(r.finished_at)
|
||||
if not starts or not ends:
|
||||
return 0.0
|
||||
return (max(ends) - min(starts)).total_seconds()
|
||||
|
||||
@staticmethod
|
||||
def _calc_critical_path(graph: Graph, report: RunReport) -> tuple[tuple[str, ...], float]:
|
||||
"""计算关键路径:DAG 最长路径(按实际执行耗时)。
|
||||
|
||||
使用拓扑排序 + 动态规划,O(V+E)。SKIPPED 任务耗时按 0 计。
|
||||
"""
|
||||
# 构建耗时映射
|
||||
durations: dict[str, float] = {}
|
||||
for name, result in report.results.items():
|
||||
durations[name] = result.duration or 0.0
|
||||
|
||||
# 拓扑序(使用 graph.layers 保证与分层一致)
|
||||
try:
|
||||
layers = graph.layers()
|
||||
except Exception:
|
||||
# 图校验失败时回退为空
|
||||
return (), 0.0
|
||||
|
||||
# earliest_finish[name] = duration[name] + max(earliest_finish[dep] for dep in deps)
|
||||
earliest_finish: dict[str, float] = {}
|
||||
predecessor: dict[str, str | None] = {}
|
||||
|
||||
for layer in layers:
|
||||
for name in layer:
|
||||
spec = graph.specs.get(name)
|
||||
deps = spec.depends_on if spec is not None else ()
|
||||
if not deps:
|
||||
earliest_finish[name] = durations.get(name, 0.0)
|
||||
predecessor[name] = None
|
||||
else:
|
||||
best_dep: str | None = None
|
||||
best_ef = 0.0
|
||||
for dep in deps:
|
||||
ef = earliest_finish.get(dep, 0.0)
|
||||
if ef >= best_ef:
|
||||
best_ef = ef
|
||||
best_dep = dep
|
||||
earliest_finish[name] = best_ef + durations.get(name, 0.0)
|
||||
predecessor[name] = best_dep
|
||||
|
||||
if not earliest_finish:
|
||||
return (), 0.0
|
||||
|
||||
# 找到 earliest_finish 最大的节点作为终点
|
||||
end_node = max(earliest_finish, key=lambda n: earliest_finish[n])
|
||||
total = earliest_finish[end_node]
|
||||
|
||||
# 回溯关键路径
|
||||
path: list[str] = []
|
||||
node: str | None = end_node
|
||||
while node is not None:
|
||||
path.append(node)
|
||||
node = predecessor.get(node)
|
||||
path.reverse()
|
||||
|
||||
return tuple(path), total
|
||||
|
||||
@staticmethod
|
||||
def _calc_parallelism(report: RunReport) -> tuple[float, int]:
|
||||
"""计算平均并行度与峰值并行度。
|
||||
|
||||
基于时间线扫描:将每个任务的 [started_at, finished_at] 区间
|
||||
转为事件点(+1/-1),排序后扫描得到瞬时并行度序列。
|
||||
|
||||
返回 (avg_parallelism, peak_parallelism)。
|
||||
无有效时间戳时返回 (0.0, 0)。
|
||||
"""
|
||||
events: list[tuple[float, int]] = [] # (timestamp, delta)
|
||||
for r in report.results.values():
|
||||
if r.started_at is None or r.finished_at is None:
|
||||
continue
|
||||
if r.status == TaskStatus.SKIPPED:
|
||||
continue
|
||||
start_ts = r.started_at.timestamp()
|
||||
end_ts = r.finished_at.timestamp()
|
||||
if end_ts <= start_ts:
|
||||
continue
|
||||
events.append((start_ts, 1))
|
||||
events.append((end_ts, -1))
|
||||
|
||||
if not events:
|
||||
return 0.0, 0
|
||||
|
||||
# 排序:同一时间点先处理结束(-1)再处理开始(+1),避免虚假峰值
|
||||
events.sort(key=lambda e: (e[0], e[1]))
|
||||
|
||||
current = 0
|
||||
peak = 0
|
||||
# 加权面积用于计算平均并行度
|
||||
area = 0.0
|
||||
prev_ts = events[0][0]
|
||||
for ts, delta in events:
|
||||
if ts > prev_ts:
|
||||
area += current * (ts - prev_ts)
|
||||
current += delta
|
||||
peak = max(peak, current)
|
||||
prev_ts = ts
|
||||
|
||||
total_span = events[-1][0] - events[0][0]
|
||||
avg = area / total_span if total_span > 0 else 0.0
|
||||
return avg, peak
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 查询
|
||||
# ------------------------------------------------------------------ #
|
||||
def task(self, name: str) -> TaskProfile:
|
||||
"""返回指定任务的剖面。不存在则 ``KeyError``。"""
|
||||
for t in self.tasks:
|
||||
if t.name == name:
|
||||
return t
|
||||
raise KeyError(name)
|
||||
|
||||
def top_bottlenecks(self, n: int = 5) -> tuple[TaskProfile, ...]:
|
||||
"""返回耗时最长的 Top-N 任务(按 duration 降序)。
|
||||
|
||||
参数
|
||||
----
|
||||
n:
|
||||
返回数量。``n <= 0`` 返回空元组。
|
||||
"""
|
||||
if n <= 0:
|
||||
return ()
|
||||
return tuple(sorted(self.tasks, key=lambda t: t.duration, reverse=True)[:n])
|
||||
|
||||
def critical_tasks(self) -> tuple[TaskProfile, ...]:
|
||||
"""返回关键路径上的所有任务(按路径顺序)。"""
|
||||
critical_set = set(self.critical_path)
|
||||
# 保持关键路径顺序
|
||||
order = {name: i for i, name in enumerate(self.critical_path)}
|
||||
return tuple(sorted((t for t in self.tasks if t.name in critical_set), key=lambda t: order[t.name]))
|
||||
|
||||
def failed_tasks(self) -> tuple[TaskProfile, ...]:
|
||||
"""返回 FAILED 状态的任务。"""
|
||||
return tuple(t for t in self.tasks if t.status == TaskStatus.FAILED)
|
||||
|
||||
def skipped_tasks(self) -> tuple[TaskProfile, ...]:
|
||||
"""返回 SKIPPED 状态的任务。"""
|
||||
return tuple(t for t in self.tasks if t.status == TaskStatus.SKIPPED)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 输出
|
||||
# ------------------------------------------------------------------ #
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""转为 JSON 友好的字典。"""
|
||||
return {
|
||||
"tasks": [t.to_dict() for t in self.tasks],
|
||||
"total_duration_seconds": round(self.total_duration, 6),
|
||||
"critical_path_duration_seconds": round(self.critical_path_duration, 6),
|
||||
"critical_path": list(self.critical_path),
|
||||
"avg_parallelism": round(self.avg_parallelism, 4),
|
||||
"peak_parallelism": self.peak_parallelism,
|
||||
"parallelism_efficiency": round(self.parallelism_efficiency, 4),
|
||||
"bottlenecks": [t.to_dict() for t in self.top_bottlenecks(5)],
|
||||
}
|
||||
|
||||
def describe(self) -> str:
|
||||
"""人类可读的多行性能报告。"""
|
||||
lines: list[str] = []
|
||||
lines.append("=" * 70)
|
||||
lines.append("PyFlowX 性能剖面报告")
|
||||
lines.append("=" * 70)
|
||||
lines.append("")
|
||||
lines.append("【图级指标】")
|
||||
lines.append(f" 总耗时 (wall-clock): {self.total_duration:.3f}s")
|
||||
lines.append(f" 关键路径耗时: {self.critical_path_duration:.3f}s")
|
||||
lines.append(f" 平均并行度: {self.avg_parallelism:.2f}")
|
||||
lines.append(f" 峰值并行度: {self.peak_parallelism}")
|
||||
lines.append(f" 并行度效率: {self.parallelism_efficiency:.2%}")
|
||||
lines.append(f" 任务总数: {len(self.tasks)}")
|
||||
lines.append("")
|
||||
|
||||
# 关键路径
|
||||
lines.append("【关键路径】")
|
||||
if self.critical_path:
|
||||
lines.append(f" {' -> '.join(self.critical_path)}")
|
||||
else:
|
||||
lines.append(" (无)")
|
||||
lines.append("")
|
||||
|
||||
# Top 瓶颈
|
||||
bottlenecks = self.top_bottlenecks(5)
|
||||
lines.append(f"【Top {len(bottlenecks)} 瓶颈任务】")
|
||||
if bottlenecks:
|
||||
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
|
||||
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
|
||||
for t in bottlenecks:
|
||||
critical_flag = "✓" if t.is_on_critical_path else ""
|
||||
lines.append(
|
||||
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
|
||||
f"{critical_flag:>8} {t.status.value:>8}",
|
||||
)
|
||||
else:
|
||||
lines.append(" (无)")
|
||||
lines.append("")
|
||||
|
||||
# 全部任务详情
|
||||
lines.append("【全部任务】")
|
||||
if self.tasks:
|
||||
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
|
||||
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
|
||||
for t in self.tasks:
|
||||
critical_flag = "✓" if t.is_on_critical_path else ""
|
||||
lines.append(
|
||||
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
|
||||
f"{critical_flag:>8} {t.status.value:>8}",
|
||||
)
|
||||
else:
|
||||
lines.append(" (无)")
|
||||
lines.append("")
|
||||
lines.append("=" * 70)
|
||||
return "\n".join(lines)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"ProfileReport(tasks={len(self.tasks)}, "
|
||||
f"total={self.total_duration:.3f}s, "
|
||||
f"critical={self.critical_path_duration:.3f}s, "
|
||||
f"avg_par={self.avg_parallelism:.2f}, "
|
||||
f"peak_par={self.peak_parallelism})"
|
||||
)
|
||||
@@ -0,0 +1,567 @@
|
||||
"""性能剖面(ProfileReport)测试.
|
||||
|
||||
覆盖策略:
|
||||
* 构造带时间戳的 RunReport + Graph,验证关键路径、并行度、瓶颈排序。
|
||||
* 边界场景:空报告、单任务、无时间戳、SKIPPED 任务、图校验失败。
|
||||
* 输出格式:to_dict / describe / top_bottlenecks / critical_tasks。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
import pyflowx as px
|
||||
from pyflowx.profiling import ProfileReport, TaskProfile
|
||||
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
|
||||
|
||||
|
||||
def _fn() -> int:
|
||||
return 1
|
||||
|
||||
|
||||
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
|
||||
return TaskSpec[Any](name, _fn, depends_on=deps)
|
||||
|
||||
|
||||
def _result(
|
||||
name: str,
|
||||
start: datetime,
|
||||
duration: float,
|
||||
*,
|
||||
status: TaskStatus = TaskStatus.SUCCESS,
|
||||
attempts: int = 1,
|
||||
) -> TaskResult[Any]:
|
||||
"""构造带时间戳的 TaskResult."""
|
||||
end = start + timedelta(seconds=duration) if duration > 0 else start
|
||||
return TaskResult[Any](
|
||||
spec=_spec(name),
|
||||
status=status,
|
||||
value=None,
|
||||
attempts=attempts,
|
||||
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||
)
|
||||
|
||||
|
||||
def _skipped_result(name: str, reason: str = "skip") -> TaskResult[Any]:
|
||||
"""构造 SKIPPED 结果(无时间戳)."""
|
||||
return TaskResult[Any](
|
||||
spec=_spec(name),
|
||||
status=TaskStatus.SKIPPED,
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
class TestProfileReportConstruction:
|
||||
"""测试 ProfileReport 构建."""
|
||||
|
||||
def test_empty_report(self) -> None:
|
||||
"""空报告应产生空剖面."""
|
||||
report = px.RunReport()
|
||||
graph = px.Graph()
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
assert len(profile.tasks) == 0
|
||||
assert profile.total_duration == 0.0
|
||||
assert profile.critical_path == ()
|
||||
assert profile.critical_path_duration == 0.0
|
||||
assert profile.avg_parallelism == 0.0
|
||||
assert profile.peak_parallelism == 0
|
||||
|
||||
def test_single_task(self) -> None:
|
||||
"""单任务:关键路径就是它自己,并行度为 1."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.5)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert len(profile.tasks) == 1
|
||||
assert profile.tasks[0].name == "a"
|
||||
assert profile.tasks[0].duration == 1.5
|
||||
assert profile.tasks[0].is_on_critical_path
|
||||
assert profile.total_duration == 1.5
|
||||
assert profile.critical_path == ("a",)
|
||||
assert profile.critical_path_duration == 1.5
|
||||
assert profile.avg_parallelism == 1.0
|
||||
assert profile.peak_parallelism == 1
|
||||
assert profile.parallelism_efficiency == 1.0
|
||||
|
||||
def test_serial_chain(self) -> None:
|
||||
"""串行链 a -> b -> c:关键路径为全部,效率 100%."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
|
||||
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.5)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
_spec("c", deps=("b",)),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.total_duration == 4.5
|
||||
assert profile.critical_path_duration == 4.5
|
||||
assert profile.critical_path == ("a", "b", "c")
|
||||
assert profile.parallelism_efficiency == 1.0
|
||||
assert profile.peak_parallelism == 1
|
||||
assert profile.avg_parallelism == 1.0
|
||||
|
||||
def test_parallel_tasks(self) -> None:
|
||||
"""并行任务 a, b 同时执行:关键路径取较长者,效率 < 1."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start, 2.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# wall-clock = 2.0, 关键路径 = 2.0 (b), 效率 = 1.0
|
||||
# 因为关键路径定义就是最长路径,与 wall-clock 相同
|
||||
assert profile.total_duration == 2.0
|
||||
assert profile.critical_path_duration == 2.0
|
||||
assert profile.critical_path == ("b",)
|
||||
assert profile.peak_parallelism == 2
|
||||
# 平均并行度 = (1.0 + 2.0) / 2.0 = 1.5
|
||||
assert profile.avg_parallelism == 1.5
|
||||
|
||||
def test_parallel_with_join(self) -> None:
|
||||
"""a, b 并行后 join 到 c:关键路径 a->c 或 b->c."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start, 3.0)
|
||||
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.0)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b"),
|
||||
_spec("c", deps=("a", "b")),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# 关键路径 = b -> c (3 + 1 = 4)
|
||||
assert profile.critical_path_duration == 4.0
|
||||
assert profile.critical_path == ("b", "c")
|
||||
assert profile.tasks[0].is_on_critical_path is False # a 不在关键路径
|
||||
# task("b") 在关键路径上
|
||||
assert profile.task("b").is_on_critical_path
|
||||
assert profile.task("c").is_on_critical_path
|
||||
|
||||
def test_skipped_task_no_timestamp(self) -> None:
|
||||
"""SKIPPED 任务无时间戳:不影响并行度计算."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _skipped_result("b")
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# b 是 SKIPPED,duration=0
|
||||
assert profile.task("b").status == TaskStatus.SKIPPED
|
||||
assert profile.task("b").duration == 0.0
|
||||
assert profile.peak_parallelism == 1 # 只有 a 在跑
|
||||
|
||||
|
||||
class TestWaitTime:
|
||||
"""测试等待时间计算."""
|
||||
|
||||
def test_no_deps_zero_wait(self) -> None:
|
||||
"""无依赖任务等待时间为 0."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.task("a").wait_time == 0.0
|
||||
|
||||
def test_wait_after_dep_completes(self) -> None:
|
||||
"""b 在 a 完成后等待 0.5s 才开始."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start + timedelta(seconds=1.5), 1.0)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.task("b").wait_time == 0.5
|
||||
|
||||
def test_wait_negative_clamped_to_zero(self) -> None:
|
||||
"""b 在 a 完成前就开始(异常情况)应钳制为 0."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 2.0)
|
||||
# b 在 a 还没完成时就开始(不应该但可能发生)
|
||||
report.results["b"] = _result("b", start + timedelta(seconds=1), 1.0)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# a 在 t=2 结束,b 在 t=1 开始,delta = -1,钳制为 0
|
||||
assert profile.task("b").wait_time == 0.0
|
||||
|
||||
def test_skipped_task_zero_wait(self) -> None:
|
||||
"""SKIPPED 任务等待时间为 0."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _skipped_result("b")
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.task("b").wait_time == 0.0
|
||||
|
||||
|
||||
class TestCriticalPath:
|
||||
"""测试关键路径分析."""
|
||||
|
||||
def test_diamond_dependency(self) -> None:
|
||||
"""菱形依赖:a -> b -> d, a -> c -> d,关键路径取较长分支."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||||
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||||
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
_spec("c", deps=("a",)),
|
||||
_spec("d", deps=("b", "c")),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# 关键路径:a -> b -> d = 1 + 3 + 1 = 5
|
||||
assert profile.critical_path_duration == 5.0
|
||||
assert profile.critical_path == ("a", "b", "d")
|
||||
|
||||
def test_graph_validation_failure_returns_empty(self) -> None:
|
||||
"""图校验失败(有环)应回退为空关键路径."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
# 手动构造带环的图(绕过校验)
|
||||
graph = px.Graph()
|
||||
graph.specs["a"] = _spec("a", deps=("b",))
|
||||
graph.specs["b"] = _spec("b", deps=("a",))
|
||||
graph.deps["a"] = ("b",)
|
||||
graph.deps["b"] = ("a",)
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# layers() 抛 CycleError,回退为空
|
||||
assert profile.critical_path == ()
|
||||
assert profile.critical_path_duration == 0.0
|
||||
|
||||
|
||||
class TestParallelism:
|
||||
"""测试并行度计算."""
|
||||
|
||||
def test_no_timestamps_zero_parallelism(self) -> None:
|
||||
"""所有任务无时间戳:并行度为 0."""
|
||||
report = px.RunReport()
|
||||
report.results["a"] = TaskResult[Any](spec=_spec("a"), status=TaskStatus.SUCCESS)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.avg_parallelism == 0.0
|
||||
assert profile.peak_parallelism == 0
|
||||
|
||||
def test_zero_duration_excluded(self) -> None:
|
||||
"""零耗时任务(end <= start)不参与并行度计算."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 0.0) # 零耗时
|
||||
report.results["b"] = _result("b", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# 只有 b 参与,峰值 = 1
|
||||
assert profile.peak_parallelism == 1
|
||||
|
||||
def test_skipped_with_timestamps_excluded(self) -> None:
|
||||
"""SKIPPED 任务即使带时间戳也不参与并行度计算."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
# SKIPPED 但带时间戳(异常但可能发生)
|
||||
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.SKIPPED)
|
||||
report.results["b"] = _result("b", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# a 是 SKIPPED,被排除;只有 b 参与
|
||||
assert profile.peak_parallelism == 1
|
||||
|
||||
def test_peak_parallelism_three_tasks(self) -> None:
|
||||
"""三个任务完全重叠:峰值并行度 = 3."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 3.0)
|
||||
report.results["b"] = _result("b", start, 3.0)
|
||||
report.results["c"] = _result("c", start, 3.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.peak_parallelism == 3
|
||||
assert profile.avg_parallelism == 3.0
|
||||
|
||||
|
||||
class TestQueries:
|
||||
"""测试查询方法."""
|
||||
|
||||
def test_task_lookup(self) -> None:
|
||||
"""task(name) 应返回对应剖面."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start, 2.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.task("a").name == "a"
|
||||
assert profile.task("b").duration == 2.0
|
||||
|
||||
def test_task_lookup_not_found(self) -> None:
|
||||
"""task(name) 不存在应抛 KeyError."""
|
||||
report = px.RunReport()
|
||||
graph = px.Graph()
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
try:
|
||||
profile.task("missing")
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("应抛出 KeyError")
|
||||
|
||||
def test_top_bottlenecks(self) -> None:
|
||||
"""top_bottlenecks 应按耗时降序返回."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start, 3.0)
|
||||
report.results["c"] = _result("c", start, 2.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
top3 = profile.top_bottlenecks(3)
|
||||
assert len(top3) == 3
|
||||
assert top3[0].name == "b"
|
||||
assert top3[1].name == "c"
|
||||
assert top3[2].name == "a"
|
||||
|
||||
def test_top_bottlenecks_zero_or_negative(self) -> None:
|
||||
"""n <= 0 应返回空元组."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert profile.top_bottlenecks(0) == ()
|
||||
assert profile.top_bottlenecks(-1) == ()
|
||||
|
||||
def test_critical_tasks(self) -> None:
|
||||
"""critical_tasks 应返回关键路径上的任务(按路径顺序)."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||||
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||||
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||||
graph = px.Graph.from_specs([
|
||||
_spec("a"),
|
||||
_spec("b", deps=("a",)),
|
||||
_spec("c", deps=("a",)),
|
||||
_spec("d", deps=("b", "c")),
|
||||
])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# 关键路径 a -> b -> d
|
||||
critical = profile.critical_tasks()
|
||||
assert len(critical) == 3
|
||||
assert [t.name for t in critical] == ["a", "b", "d"]
|
||||
|
||||
def test_failed_tasks(self) -> None:
|
||||
"""failed_tasks 应返回 FAILED 状态的任务."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
|
||||
report.results["b"] = _result("b", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
failed = profile.failed_tasks()
|
||||
assert len(failed) == 1
|
||||
assert failed[0].name == "a"
|
||||
|
||||
def test_skipped_tasks(self) -> None:
|
||||
"""skipped_tasks 应返回 SKIPPED 状态的任务."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
report.results["b"] = _skipped_result("b")
|
||||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
skipped = profile.skipped_tasks()
|
||||
assert len(skipped) == 1
|
||||
assert skipped[0].name == "b"
|
||||
|
||||
|
||||
class TestOutputFormats:
|
||||
"""测试输出格式."""
|
||||
|
||||
def test_to_dict_structure(self) -> None:
|
||||
"""to_dict 应返回包含所有字段的字典."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.5)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
d = profile.to_dict()
|
||||
|
||||
assert "tasks" in d
|
||||
assert "total_duration_seconds" in d
|
||||
assert "critical_path_duration_seconds" in d
|
||||
assert "critical_path" in d
|
||||
assert "avg_parallelism" in d
|
||||
assert "peak_parallelism" in d
|
||||
assert "parallelism_efficiency" in d
|
||||
assert "bottlenecks" in d
|
||||
assert len(d["tasks"]) == 1
|
||||
assert d["tasks"][0]["name"] == "a"
|
||||
assert d["tasks"][0]["status"] == "success"
|
||||
assert d["tasks"][0]["duration_seconds"] == 1.5
|
||||
assert d["tasks"][0]["is_on_critical_path"] is True
|
||||
|
||||
def test_describe_contains_key_sections(self) -> None:
|
||||
"""describe 应包含关键章节标题."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
text = profile.describe()
|
||||
|
||||
assert "PyFlowX 性能剖面报告" in text
|
||||
assert "【图级指标】" in text
|
||||
assert "【关键路径】" in text
|
||||
assert "【Top" in text
|
||||
assert "【全部任务】" in text
|
||||
assert "a" in text
|
||||
|
||||
def test_describe_empty_report(self) -> None:
|
||||
"""空报告的 describe 应不崩溃且包含章节标题."""
|
||||
report = px.RunReport()
|
||||
graph = px.Graph()
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
text = profile.describe()
|
||||
|
||||
assert "【图级指标】" in text
|
||||
assert "(无)" in text
|
||||
|
||||
def test_repr(self) -> None:
|
||||
"""__repr__ 应包含关键指标."""
|
||||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _result("a", start, 1.0)
|
||||
graph = px.Graph.from_specs([_spec("a")])
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
r = repr(profile)
|
||||
|
||||
assert "ProfileReport" in r
|
||||
assert "tasks=1" in r
|
||||
assert "total=1.000s" in r
|
||||
|
||||
def test_task_profile_to_dict(self) -> None:
|
||||
"""TaskProfile.to_dict 应返回正确字段."""
|
||||
tp = TaskProfile(
|
||||
name="x",
|
||||
status=TaskStatus.SUCCESS,
|
||||
duration=1.5,
|
||||
attempts=2,
|
||||
wait_time=0.3,
|
||||
is_on_critical_path=True,
|
||||
deps=("a", "b"),
|
||||
)
|
||||
d = tp.to_dict()
|
||||
|
||||
assert d["name"] == "x"
|
||||
assert d["status"] == "success"
|
||||
assert d["duration_seconds"] == 1.5
|
||||
assert d["attempts"] == 2
|
||||
assert d["wait_time_seconds"] == 0.3
|
||||
assert d["is_on_critical_path"] is True
|
||||
assert d["deps"] == ["a", "b"]
|
||||
|
||||
|
||||
class TestIntegrationWithRun:
|
||||
"""与真实 run() 集成测试."""
|
||||
|
||||
def test_profile_from_real_run(self) -> None:
|
||||
"""从真实 run() 结果构建剖面."""
|
||||
graph = px.Graph.from_specs([
|
||||
px.TaskSpec("a", lambda: 1),
|
||||
px.TaskSpec("b", lambda: 2, depends_on=("a",)),
|
||||
px.TaskSpec("c", lambda: 3, depends_on=("a",)),
|
||||
])
|
||||
report = px.run(graph, strategy="sequential")
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
assert len(profile.tasks) == 3
|
||||
assert profile.critical_path_duration > 0
|
||||
# sequential 策略下并行度应为 1
|
||||
assert profile.peak_parallelism == 1
|
||||
|
||||
def test_profile_from_thread_run(self) -> None:
|
||||
"""从 thread 策略 run() 结果构建剖面,验证并行度 > 1."""
|
||||
import time
|
||||
|
||||
def slow() -> int:
|
||||
time.sleep(0.05)
|
||||
return 1
|
||||
|
||||
graph = px.Graph.from_specs([
|
||||
px.TaskSpec("a", slow),
|
||||
px.TaskSpec("b", slow),
|
||||
px.TaskSpec("c", slow),
|
||||
])
|
||||
report = px.run(graph, strategy="thread", max_workers=3)
|
||||
|
||||
profile = ProfileReport.from_report(report, graph)
|
||||
|
||||
# 三个任务并行,峰值应 >= 2(可能因调度时机不到 3)
|
||||
assert profile.peak_parallelism >= 2
|
||||
assert profile.critical_path_duration > 0
|
||||
Reference in New Issue
Block a user