43e1aad1fe
本次提交更新了版本号至0.2.13,同时完成多项改进: 1. 在.gitignore中新增忽略性能分析文件*_profile.html 2. 修复测试用例中echo命令在Windows下无法被正确检测的问题,改用python命令 3. 优化测试用例确保性能统计数据有效,添加耗时模拟函数 4. 为所有CLI任务统一配置项目根目录作为工作目录,解决跨平台执行路径问题 5. 新增测试验证所有任务的cwd配置正确性
575 lines
21 KiB
Python
575 lines
21 KiB
Python
"""性能剖面(ProfileReport)测试.
|
||
|
||
覆盖策略:
|
||
* 构造带时间戳的 RunReport + Graph,验证关键路径、并行度、瓶颈排序。
|
||
* 边界场景:空报告、单任务、无时间戳、SKIPPED 任务、图校验失败。
|
||
* 输出格式:to_dict / describe / top_bottlenecks / critical_tasks。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime, timedelta
|
||
from typing import Any
|
||
|
||
import pyflowx as px
|
||
from pyflowx.profiling import ProfileReport, TaskProfile
|
||
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
|
||
|
||
|
||
def _fn() -> int:
|
||
return 1
|
||
|
||
|
||
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
|
||
return TaskSpec[Any](name, _fn, depends_on=deps)
|
||
|
||
|
||
def _result(
|
||
name: str,
|
||
start: datetime,
|
||
duration: float,
|
||
*,
|
||
status: TaskStatus = TaskStatus.SUCCESS,
|
||
attempts: int = 1,
|
||
) -> TaskResult[Any]:
|
||
"""构造带时间戳的 TaskResult."""
|
||
end = start + timedelta(seconds=duration) if duration > 0 else start
|
||
return TaskResult[Any](
|
||
spec=_spec(name),
|
||
status=status,
|
||
value=None,
|
||
attempts=attempts,
|
||
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||
)
|
||
|
||
|
||
def _skipped_result(name: str, reason: str = "skip") -> TaskResult[Any]:
|
||
"""构造 SKIPPED 结果(无时间戳)."""
|
||
return TaskResult[Any](
|
||
spec=_spec(name),
|
||
status=TaskStatus.SKIPPED,
|
||
reason=reason,
|
||
)
|
||
|
||
|
||
class TestProfileReportConstruction:
|
||
"""测试 ProfileReport 构建."""
|
||
|
||
def test_empty_report(self) -> None:
|
||
"""空报告应产生空剖面."""
|
||
report = px.RunReport()
|
||
graph = px.Graph()
|
||
profile = ProfileReport.from_report(report, graph)
|
||
assert len(profile.tasks) == 0
|
||
assert profile.total_duration == 0.0
|
||
assert profile.critical_path == ()
|
||
assert profile.critical_path_duration == 0.0
|
||
assert profile.avg_parallelism == 0.0
|
||
assert profile.peak_parallelism == 0
|
||
|
||
def test_single_task(self) -> None:
|
||
"""单任务:关键路径就是它自己,并行度为 1."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.5)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert len(profile.tasks) == 1
|
||
assert profile.tasks[0].name == "a"
|
||
assert profile.tasks[0].duration == 1.5
|
||
assert profile.tasks[0].is_on_critical_path
|
||
assert profile.total_duration == 1.5
|
||
assert profile.critical_path == ("a",)
|
||
assert profile.critical_path_duration == 1.5
|
||
assert profile.avg_parallelism == 1.0
|
||
assert profile.peak_parallelism == 1
|
||
assert profile.parallelism_efficiency == 1.0
|
||
|
||
def test_serial_chain(self) -> None:
|
||
"""串行链 a -> b -> c:关键路径为全部,效率 100%."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
|
||
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.5)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
_spec("c", deps=("b",)),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.total_duration == 4.5
|
||
assert profile.critical_path_duration == 4.5
|
||
assert profile.critical_path == ("a", "b", "c")
|
||
assert profile.parallelism_efficiency == 1.0
|
||
assert profile.peak_parallelism == 1
|
||
assert profile.avg_parallelism == 1.0
|
||
|
||
def test_parallel_tasks(self) -> None:
|
||
"""并行任务 a, b 同时执行:关键路径取较长者,效率 < 1."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start, 2.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# wall-clock = 2.0, 关键路径 = 2.0 (b), 效率 = 1.0
|
||
# 因为关键路径定义就是最长路径,与 wall-clock 相同
|
||
assert profile.total_duration == 2.0
|
||
assert profile.critical_path_duration == 2.0
|
||
assert profile.critical_path == ("b",)
|
||
assert profile.peak_parallelism == 2
|
||
# 平均并行度 = (1.0 + 2.0) / 2.0 = 1.5
|
||
assert profile.avg_parallelism == 1.5
|
||
|
||
def test_parallel_with_join(self) -> None:
|
||
"""a, b 并行后 join 到 c:关键路径 a->c 或 b->c."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start, 3.0)
|
||
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.0)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b"),
|
||
_spec("c", deps=("a", "b")),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# 关键路径 = b -> c (3 + 1 = 4)
|
||
assert profile.critical_path_duration == 4.0
|
||
assert profile.critical_path == ("b", "c")
|
||
assert profile.tasks[0].is_on_critical_path is False # a 不在关键路径
|
||
# task("b") 在关键路径上
|
||
assert profile.task("b").is_on_critical_path
|
||
assert profile.task("c").is_on_critical_path
|
||
|
||
def test_skipped_task_no_timestamp(self) -> None:
|
||
"""SKIPPED 任务无时间戳:不影响并行度计算."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _skipped_result("b")
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# b 是 SKIPPED,duration=0
|
||
assert profile.task("b").status == TaskStatus.SKIPPED
|
||
assert profile.task("b").duration == 0.0
|
||
assert profile.peak_parallelism == 1 # 只有 a 在跑
|
||
|
||
|
||
class TestWaitTime:
|
||
"""测试等待时间计算."""
|
||
|
||
def test_no_deps_zero_wait(self) -> None:
|
||
"""无依赖任务等待时间为 0."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.task("a").wait_time == 0.0
|
||
|
||
def test_wait_after_dep_completes(self) -> None:
|
||
"""b 在 a 完成后等待 0.5s 才开始."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start + timedelta(seconds=1.5), 1.0)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.task("b").wait_time == 0.5
|
||
|
||
def test_wait_negative_clamped_to_zero(self) -> None:
|
||
"""b 在 a 完成前就开始(异常情况)应钳制为 0."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 2.0)
|
||
# b 在 a 还没完成时就开始(不应该但可能发生)
|
||
report.results["b"] = _result("b", start + timedelta(seconds=1), 1.0)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# a 在 t=2 结束,b 在 t=1 开始,delta = -1,钳制为 0
|
||
assert profile.task("b").wait_time == 0.0
|
||
|
||
def test_skipped_task_zero_wait(self) -> None:
|
||
"""SKIPPED 任务等待时间为 0."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _skipped_result("b")
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.task("b").wait_time == 0.0
|
||
|
||
|
||
class TestCriticalPath:
|
||
"""测试关键路径分析."""
|
||
|
||
def test_diamond_dependency(self) -> None:
|
||
"""菱形依赖:a -> b -> d, a -> c -> d,关键路径取较长分支."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
_spec("c", deps=("a",)),
|
||
_spec("d", deps=("b", "c")),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# 关键路径:a -> b -> d = 1 + 3 + 1 = 5
|
||
assert profile.critical_path_duration == 5.0
|
||
assert profile.critical_path == ("a", "b", "d")
|
||
|
||
def test_graph_validation_failure_returns_empty(self) -> None:
|
||
"""图校验失败(有环)应回退为空关键路径."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
# 手动构造带环的图(绕过校验)
|
||
graph = px.Graph()
|
||
graph.specs["a"] = _spec("a", deps=("b",))
|
||
graph.specs["b"] = _spec("b", deps=("a",))
|
||
graph.deps["a"] = ("b",)
|
||
graph.deps["b"] = ("a",)
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# layers() 抛 CycleError,回退为空
|
||
assert profile.critical_path == ()
|
||
assert profile.critical_path_duration == 0.0
|
||
|
||
|
||
class TestParallelism:
|
||
"""测试并行度计算."""
|
||
|
||
def test_no_timestamps_zero_parallelism(self) -> None:
|
||
"""所有任务无时间戳:并行度为 0."""
|
||
report = px.RunReport()
|
||
report.results["a"] = TaskResult[Any](spec=_spec("a"), status=TaskStatus.SUCCESS)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.avg_parallelism == 0.0
|
||
assert profile.peak_parallelism == 0
|
||
|
||
def test_zero_duration_excluded(self) -> None:
|
||
"""零耗时任务(end <= start)不参与并行度计算."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 0.0) # 零耗时
|
||
report.results["b"] = _result("b", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# 只有 b 参与,峰值 = 1
|
||
assert profile.peak_parallelism == 1
|
||
|
||
def test_skipped_with_timestamps_excluded(self) -> None:
|
||
"""SKIPPED 任务即使带时间戳也不参与并行度计算."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
# SKIPPED 但带时间戳(异常但可能发生)
|
||
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.SKIPPED)
|
||
report.results["b"] = _result("b", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# a 是 SKIPPED,被排除;只有 b 参与
|
||
assert profile.peak_parallelism == 1
|
||
|
||
def test_peak_parallelism_three_tasks(self) -> None:
|
||
"""三个任务完全重叠:峰值并行度 = 3."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 3.0)
|
||
report.results["b"] = _result("b", start, 3.0)
|
||
report.results["c"] = _result("c", start, 3.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.peak_parallelism == 3
|
||
assert profile.avg_parallelism == 3.0
|
||
|
||
|
||
class TestQueries:
|
||
"""测试查询方法."""
|
||
|
||
def test_task_lookup(self) -> None:
|
||
"""task(name) 应返回对应剖面."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start, 2.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.task("a").name == "a"
|
||
assert profile.task("b").duration == 2.0
|
||
|
||
def test_task_lookup_not_found(self) -> None:
|
||
"""task(name) 不存在应抛 KeyError."""
|
||
report = px.RunReport()
|
||
graph = px.Graph()
|
||
profile = ProfileReport.from_report(report, graph)
|
||
try:
|
||
profile.task("missing")
|
||
except KeyError:
|
||
pass
|
||
else:
|
||
raise AssertionError("应抛出 KeyError")
|
||
|
||
def test_top_bottlenecks(self) -> None:
|
||
"""top_bottlenecks 应按耗时降序返回."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start, 3.0)
|
||
report.results["c"] = _result("c", start, 2.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
top3 = profile.top_bottlenecks(3)
|
||
assert len(top3) == 3
|
||
assert top3[0].name == "b"
|
||
assert top3[1].name == "c"
|
||
assert top3[2].name == "a"
|
||
|
||
def test_top_bottlenecks_zero_or_negative(self) -> None:
|
||
"""n <= 0 应返回空元组."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert profile.top_bottlenecks(0) == ()
|
||
assert profile.top_bottlenecks(-1) == ()
|
||
|
||
def test_critical_tasks(self) -> None:
|
||
"""critical_tasks 应返回关键路径上的任务(按路径顺序)."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||
graph = px.Graph.from_specs([
|
||
_spec("a"),
|
||
_spec("b", deps=("a",)),
|
||
_spec("c", deps=("a",)),
|
||
_spec("d", deps=("b", "c")),
|
||
])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# 关键路径 a -> b -> d
|
||
critical = profile.critical_tasks()
|
||
assert len(critical) == 3
|
||
assert [t.name for t in critical] == ["a", "b", "d"]
|
||
|
||
def test_failed_tasks(self) -> None:
|
||
"""failed_tasks 应返回 FAILED 状态的任务."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
|
||
report.results["b"] = _result("b", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
failed = profile.failed_tasks()
|
||
assert len(failed) == 1
|
||
assert failed[0].name == "a"
|
||
|
||
def test_skipped_tasks(self) -> None:
|
||
"""skipped_tasks 应返回 SKIPPED 状态的任务."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
report.results["b"] = _skipped_result("b")
|
||
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
skipped = profile.skipped_tasks()
|
||
assert len(skipped) == 1
|
||
assert skipped[0].name == "b"
|
||
|
||
|
||
class TestOutputFormats:
|
||
"""测试输出格式."""
|
||
|
||
def test_to_dict_structure(self) -> None:
|
||
"""to_dict 应返回包含所有字段的字典."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.5)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
d = profile.to_dict()
|
||
|
||
assert "tasks" in d
|
||
assert "total_duration_seconds" in d
|
||
assert "critical_path_duration_seconds" in d
|
||
assert "critical_path" in d
|
||
assert "avg_parallelism" in d
|
||
assert "peak_parallelism" in d
|
||
assert "parallelism_efficiency" in d
|
||
assert "bottlenecks" in d
|
||
assert len(d["tasks"]) == 1
|
||
assert d["tasks"][0]["name"] == "a"
|
||
assert d["tasks"][0]["status"] == "success"
|
||
assert d["tasks"][0]["duration_seconds"] == 1.5
|
||
assert d["tasks"][0]["is_on_critical_path"] is True
|
||
|
||
def test_describe_contains_key_sections(self) -> None:
|
||
"""describe 应包含关键章节标题."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
text = profile.describe()
|
||
|
||
assert "PyFlowX 性能剖面报告" in text
|
||
assert "【图级指标】" in text
|
||
assert "【关键路径】" in text
|
||
assert "【Top" in text
|
||
assert "【全部任务】" in text
|
||
assert "a" in text
|
||
|
||
def test_describe_empty_report(self) -> None:
|
||
"""空报告的 describe 应不崩溃且包含章节标题."""
|
||
report = px.RunReport()
|
||
graph = px.Graph()
|
||
profile = ProfileReport.from_report(report, graph)
|
||
text = profile.describe()
|
||
|
||
assert "【图级指标】" in text
|
||
assert "(无)" in text
|
||
|
||
def test_repr(self) -> None:
|
||
"""__repr__ 应包含关键指标."""
|
||
start = datetime(2024, 1, 1, 0, 0, 0)
|
||
report = px.RunReport()
|
||
report.results["a"] = _result("a", start, 1.0)
|
||
graph = px.Graph.from_specs([_spec("a")])
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
r = repr(profile)
|
||
|
||
assert "ProfileReport" in r
|
||
assert "tasks=1" in r
|
||
assert "total=1.000s" in r
|
||
|
||
def test_task_profile_to_dict(self) -> None:
|
||
"""TaskProfile.to_dict 应返回正确字段."""
|
||
tp = TaskProfile(
|
||
name="x",
|
||
status=TaskStatus.SUCCESS,
|
||
duration=1.5,
|
||
attempts=2,
|
||
wait_time=0.3,
|
||
is_on_critical_path=True,
|
||
deps=("a", "b"),
|
||
)
|
||
d = tp.to_dict()
|
||
|
||
assert d["name"] == "x"
|
||
assert d["status"] == "success"
|
||
assert d["duration_seconds"] == 1.5
|
||
assert d["attempts"] == 2
|
||
assert d["wait_time_seconds"] == 0.3
|
||
assert d["is_on_critical_path"] is True
|
||
assert d["deps"] == ["a", "b"]
|
||
|
||
|
||
class TestIntegrationWithRun:
|
||
"""与真实 run() 集成测试."""
|
||
|
||
def test_profile_from_real_run(self) -> None:
|
||
"""从真实 run() 结果构建剖面."""
|
||
import time
|
||
|
||
def slow() -> int:
|
||
time.sleep(0.01) # 确保任务有实际耗时,避免 duration 极小导致并行度计算为 0
|
||
return 1
|
||
|
||
graph = px.Graph.from_specs([
|
||
px.TaskSpec("a", slow),
|
||
px.TaskSpec("b", slow, depends_on=("a",)),
|
||
px.TaskSpec("c", slow, depends_on=("a",)),
|
||
])
|
||
report = px.run(graph, strategy="sequential")
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
assert len(profile.tasks) == 3
|
||
# sequential 策略下应为串行,duration > 0
|
||
assert profile.critical_path_duration > 0
|
||
# sequential 策略下并行度应为 1
|
||
assert profile.peak_parallelism == 1
|
||
|
||
def test_profile_from_thread_run(self) -> None:
|
||
"""从 thread 策略 run() 结果构建剖面,验证并行度 > 1."""
|
||
import time
|
||
|
||
def slow() -> int:
|
||
time.sleep(0.05)
|
||
return 1
|
||
|
||
graph = px.Graph.from_specs([
|
||
px.TaskSpec("a", slow),
|
||
px.TaskSpec("b", slow),
|
||
px.TaskSpec("c", slow),
|
||
])
|
||
report = px.run(graph, strategy="thread", max_workers=3)
|
||
|
||
profile = ProfileReport.from_report(report, graph)
|
||
|
||
# 三个任务并行,峰值应 >= 2(可能因调度时机不到 3)
|
||
assert profile.peak_parallelism >= 2
|
||
assert profile.critical_path_duration > 0
|