Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9cfcfb38e4 | |||
| 69db241611 | |||
| 66e6295a24 | |||
| aebb4fce68 | |||
| 7784c8ff86 | |||
| 77918a5568 | |||
| 7e4c615dc7 | |||
| ac5082523e | |||
| 0df6f7c8ac | |||
| 4b66176ce6 | |||
| cf6b6fd059 | |||
| 6f93e6eb6d | |||
| 43e1aad1fe | |||
| 467634f8c7 | |||
| ce31f60441 | |||
| 3d6d769685 |
@@ -9,42 +9,24 @@ concurrency:
|
|||||||
cancel-in-progress: true
|
cancel-in-progress: true
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint-and-typecheck:
|
ci:
|
||||||
name: Lint & Typecheck
|
name: Lint, Typecheck & Test
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: http://gitea:3000/zhou/checkout.git@main
|
||||||
|
|
||||||
- uses: astral-sh/setup-uv@v5
|
- uses: http://gitea:3000/zhou/setup-uv.git@v8.1.0
|
||||||
with:
|
with:
|
||||||
|
uv-version: "0.5.21" # 或 "0.6.0" 等你需要的版本
|
||||||
enable-cache: true
|
enable-cache: true
|
||||||
|
|
||||||
- uses: actions/setup-python@v5
|
- uses: http://gitea:3000/zhou/setup-python.git@v6
|
||||||
with:
|
|
||||||
python-version: '3.13'
|
|
||||||
|
|
||||||
- run: uv sync
|
|
||||||
- run: uv run ruff check src tests
|
|
||||||
- run: uv run pyrefly check .
|
|
||||||
|
|
||||||
test:
|
|
||||||
name: Test (${{ matrix.os }})
|
|
||||||
runs-on: ${{ matrix.os }}
|
|
||||||
strategy:
|
|
||||||
fail-fast: false
|
|
||||||
matrix:
|
|
||||||
os: [ubuntu-latest, windows-latest, macos-latest]
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- uses: astral-sh/setup-uv@v5
|
|
||||||
with:
|
|
||||||
enable-cache: true
|
|
||||||
|
|
||||||
- uses: actions/setup-python@v5
|
|
||||||
with:
|
with:
|
||||||
python-version: |
|
python-version: |
|
||||||
3.8
|
3.8
|
||||||
3.13
|
3.13
|
||||||
|
|
||||||
|
- run: uv sync
|
||||||
|
- run: uv run ruff check src tests
|
||||||
|
- run: uv run pyrefly check .
|
||||||
- run: uvx tox run -e py38,py313
|
- run: uvx tox run -e py38,py313
|
||||||
|
|||||||
@@ -6,56 +6,42 @@ on:
|
|||||||
|
|
||||||
permissions:
|
permissions:
|
||||||
contents: write
|
contents: write
|
||||||
id-token: write
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
release:
|
||||||
|
name: Build, Publish & Release
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
outputs:
|
|
||||||
version: ${{ steps.version.outputs.version }}
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: http://gitea:3000/zhou/checkout.git@v4
|
||||||
|
|
||||||
- uses: astral-sh/setup-uv@v5
|
- uses: http://gitea:3000/zhou/setup-uv.git@v8.1.0
|
||||||
with:
|
with:
|
||||||
enable-cache: true
|
enable-cache: true
|
||||||
|
|
||||||
- uses: actions/setup-python@v5
|
- uses: http://gitea:3000/zhou/setup-python.git@v6
|
||||||
with:
|
with:
|
||||||
python-version: '3.13'
|
python-version: '3.13'
|
||||||
|
|
||||||
- run: uv build
|
- run: uv build
|
||||||
|
|
||||||
- id: version
|
- uses: http://gitea:3000/zhou/gh-action-pypi-publish.git@release/v1
|
||||||
run: echo "version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
- uses: actions/upload-artifact@v7
|
|
||||||
with:
|
with:
|
||||||
name: dist
|
password: ${{ secrets.PYPI_API_TOKEN }}
|
||||||
path: dist/
|
|
||||||
|
|
||||||
publish-pypi:
|
- name: Create Gitea Release
|
||||||
needs: build
|
run: |
|
||||||
runs-on: ubuntu-latest
|
VERSION=${GITHUB_REF#refs/tags/v}
|
||||||
environment: pypi
|
REPO=${GITHUB_REPOSITORY}
|
||||||
steps:
|
UPLOAD_URL=$(curl -s -X POST "https://git.gookeryoung.cn/api/v1/repos/${REPO}/releases" \
|
||||||
- uses: actions/download-artifact@v8
|
-H "Authorization: token ${{ secrets.GITEA_TOKEN }}" \
|
||||||
with:
|
-H "Content-Type: application/json" \
|
||||||
name: dist
|
-d "{\"tag_name\": \"v${VERSION}\", \"name\": \"v${VERSION}\", \"draft\": false, \"prerelease\": false}" \
|
||||||
path: dist
|
| jq -r '.upload_url')
|
||||||
|
for file in dist/*; do
|
||||||
- uses: pypa/gh-action-pypi-publish@release/v1
|
curl -s -X POST "${UPLOAD_URL}?name=$(basename $file)" \
|
||||||
|
-H "Authorization: token ${{ secrets.GITEA_TOKEN }}" \
|
||||||
release:
|
-H "Content-Type: application/octet-stream" \
|
||||||
needs: [build, publish-pypi]
|
--data-binary @"$file"
|
||||||
runs-on: ubuntu-latest
|
done
|
||||||
steps:
|
env:
|
||||||
- uses: actions/download-artifact@v8
|
GITEA_URL: ${{ secrets.GITEA_URL || 'https://git.gookeryoung.cn' }}
|
||||||
with:
|
|
||||||
name: dist
|
|
||||||
path: dist
|
|
||||||
|
|
||||||
- uses: softprops/action-gh-release@v2
|
|
||||||
with:
|
|
||||||
files: dist/*
|
|
||||||
generate_release_notes: true
|
|
||||||
|
|||||||
@@ -10,3 +10,4 @@ wheels/
|
|||||||
.venv
|
.venv
|
||||||
.coverage
|
.coverage
|
||||||
.idea
|
.idea
|
||||||
|
*_profile.html
|
||||||
|
|||||||
+2
-1
@@ -21,7 +21,7 @@ license = { text = "MIT" }
|
|||||||
name = "pyflowx"
|
name = "pyflowx"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.8"
|
requires-python = ">=3.8"
|
||||||
version = "0.2.12"
|
version = "0.3.0"
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
autofmt = "pyflowx.cli.autofmt:main"
|
autofmt = "pyflowx.cli.autofmt:main"
|
||||||
@@ -38,6 +38,7 @@ packtool = "pyflowx.cli.packtool:main"
|
|||||||
pdftool = "pyflowx.cli.pdftool:main"
|
pdftool = "pyflowx.cli.pdftool:main"
|
||||||
piptool = "pyflowx.cli.piptool:main"
|
piptool = "pyflowx.cli.piptool:main"
|
||||||
pymake = "pyflowx.cli.pymake:main"
|
pymake = "pyflowx.cli.pymake:main"
|
||||||
|
pxp = "pyflowx.cli.profiler:main"
|
||||||
reseticon = "pyflowx.cli.reseticoncache:main"
|
reseticon = "pyflowx.cli.reseticoncache:main"
|
||||||
scrcap = "pyflowx.cli.screenshot:main"
|
scrcap = "pyflowx.cli.screenshot:main"
|
||||||
sglang = "pyflowx.cli.llm.sglang:main"
|
sglang = "pyflowx.cli.llm.sglang:main"
|
||||||
|
|||||||
@@ -82,6 +82,7 @@ from .errors import (
|
|||||||
)
|
)
|
||||||
from .executors import Strategy, run
|
from .executors import Strategy, run
|
||||||
from .graph import Graph, GraphDefaults
|
from .graph import Graph, GraphDefaults
|
||||||
|
from .profiling import ProfileReport, TaskProfile
|
||||||
from .report import RunReport
|
from .report import RunReport
|
||||||
from .runner import CliExitCode, CliRunner
|
from .runner import CliExitCode, CliRunner
|
||||||
from .storage import JSONBackend, MemoryBackend, StateBackend
|
from .storage import JSONBackend, MemoryBackend, StateBackend
|
||||||
@@ -99,7 +100,7 @@ from .task import (
|
|||||||
task_template,
|
task_template,
|
||||||
)
|
)
|
||||||
|
|
||||||
__version__ = "0.3.6"
|
__version__ = "0.4.0"
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"IS_LINUX",
|
"IS_LINUX",
|
||||||
@@ -122,6 +123,7 @@ __all__ = [
|
|||||||
"JSONBackend",
|
"JSONBackend",
|
||||||
"MemoryBackend",
|
"MemoryBackend",
|
||||||
"MissingDependencyError",
|
"MissingDependencyError",
|
||||||
|
"ProfileReport",
|
||||||
"PyFlowXError",
|
"PyFlowXError",
|
||||||
"RetryPolicy",
|
"RetryPolicy",
|
||||||
"RunReport",
|
"RunReport",
|
||||||
@@ -132,6 +134,7 @@ __all__ = [
|
|||||||
"TaskEvent",
|
"TaskEvent",
|
||||||
"TaskFailedError",
|
"TaskFailedError",
|
||||||
"TaskHooks",
|
"TaskHooks",
|
||||||
|
"TaskProfile",
|
||||||
"TaskResult",
|
"TaskResult",
|
||||||
"TaskSpec",
|
"TaskSpec",
|
||||||
"TaskStatus",
|
"TaskStatus",
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import getpass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Literal, get_args
|
from typing import Literal, get_args
|
||||||
|
|
||||||
@@ -254,6 +255,31 @@ def main() -> None:
|
|||||||
allow_upstream_skip=True,
|
allow_upstream_skip=True,
|
||||||
verbose=True,
|
verbose=True,
|
||||||
),
|
),
|
||||||
|
# 安装 Docker
|
||||||
|
px.TaskSpec(
|
||||||
|
"install_docker",
|
||||||
|
cmd=["sudo", "apt", "install", "-y", "docker-compose-v2"],
|
||||||
|
conditions=(BuiltinConditions.IS_LINUX(),),
|
||||||
|
depends_on=("install_mirror",),
|
||||||
|
allow_upstream_skip=True,
|
||||||
|
verbose=True,
|
||||||
|
),
|
||||||
|
px.TaskSpec(
|
||||||
|
"add_docker_group",
|
||||||
|
cmd=["sudo", "usermod", "-aG", "docker", getpass.getuser()],
|
||||||
|
conditions=(BuiltinConditions.IS_LINUX(),),
|
||||||
|
depends_on=("install_docker",),
|
||||||
|
allow_upstream_skip=True,
|
||||||
|
verbose=True,
|
||||||
|
),
|
||||||
|
px.TaskSpec(
|
||||||
|
"refresh_docker_group",
|
||||||
|
cmd=["newgrp", "docker"],
|
||||||
|
conditions=(BuiltinConditions.IS_LINUX(),),
|
||||||
|
depends_on=("add_docker_group",),
|
||||||
|
allow_upstream_skip=True,
|
||||||
|
verbose=True,
|
||||||
|
),
|
||||||
# 设置 Python 环境变量
|
# 设置 Python 环境变量
|
||||||
*setenv_group({
|
*setenv_group({
|
||||||
"PIP_INDEX_URL": PIP_INDEX_URLS[python_mirror],
|
"PIP_INDEX_URL": PIP_INDEX_URLS[python_mirror],
|
||||||
|
|||||||
@@ -0,0 +1,272 @@
|
|||||||
|
"""pxp —— PyFlowX 性能分析器.
|
||||||
|
|
||||||
|
分析包含 ``px`` 调用的 Python 脚本,生成工作流执行性能剖面报告。
|
||||||
|
|
||||||
|
工作原理
|
||||||
|
--------
|
||||||
|
1. 注入 hook:monkey-patch ``pyflowx.run`` / ``pyflowx.executors.run`` /
|
||||||
|
``pyflowx.runner.run``,捕获最后一次执行的 ``Graph`` 与 ``RunReport``。
|
||||||
|
2. 执行目标脚本:用 ``runpy.run_path`` 以 ``__main__`` 身份执行,
|
||||||
|
捕获 ``SystemExit``(脚本可能调 ``sys.exit``)。
|
||||||
|
3. 生成报告:从捕获的 report + graph 构建 :class:`ProfileReport`,
|
||||||
|
默认输出 HTML 并自动打开浏览器。
|
||||||
|
|
||||||
|
使用方式
|
||||||
|
--------
|
||||||
|
# 分析 pymake.py,生成 HTML 报告并打开浏览器
|
||||||
|
pxp pymake.py
|
||||||
|
|
||||||
|
# 传递参数给被分析脚本(用 -- 分隔)
|
||||||
|
pxp pymake.py -- t
|
||||||
|
|
||||||
|
# 指定输出文件
|
||||||
|
pxp pymake.py -o report.html
|
||||||
|
|
||||||
|
# 不打开浏览器
|
||||||
|
pxp pymake.py --no-browser
|
||||||
|
|
||||||
|
# 输出纯文本报告
|
||||||
|
pxp pymake.py -E text
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
__all__ = ["main"]
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import runpy
|
||||||
|
import sys
|
||||||
|
import webbrowser
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from .. import executors as _executors
|
||||||
|
from .. import runner as _runner
|
||||||
|
from ..profiling import ProfileReport
|
||||||
|
from ..report import RunReport
|
||||||
|
|
||||||
|
|
||||||
|
def _build_parser() -> argparse.ArgumentParser:
|
||||||
|
"""构建参数解析器。"""
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog="pxp",
|
||||||
|
description="PyFlowX 性能分析器:分析包含 px 调用的脚本,生成性能剖面报告。",
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=(
|
||||||
|
"示例:\n"
|
||||||
|
" pxp pymake.py # 分析并打开 HTML 报告\n"
|
||||||
|
" pxp pymake.py -- t # 传递参数 t 给脚本\n"
|
||||||
|
" pxp pymake.py -E text # 输出纯文本报告\n"
|
||||||
|
" pxp pymake.py -o out.html # 指定输出文件\n"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
_ = parser.add_argument(
|
||||||
|
"--export",
|
||||||
|
"-E",
|
||||||
|
choices=["html", "text"],
|
||||||
|
default="html",
|
||||||
|
help="导出格式(默认: html)",
|
||||||
|
)
|
||||||
|
_ = parser.add_argument(
|
||||||
|
"--no-browser",
|
||||||
|
action="store_true",
|
||||||
|
help="不自动打开浏览器(仅 HTML 格式有效)",
|
||||||
|
)
|
||||||
|
_ = parser.add_argument(
|
||||||
|
"-o",
|
||||||
|
"--output",
|
||||||
|
help="输出文件路径(默认: <script>_profile.html)",
|
||||||
|
)
|
||||||
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def _capture_px_run() -> dict[str, Any]:
|
||||||
|
"""注入 hook 捕获 px.run() 调用。
|
||||||
|
|
||||||
|
返回一个字典,``run()`` 执行后填充 ``graph`` 与 ``report``。
|
||||||
|
同时返回还原函数用于 finally 块。
|
||||||
|
|
||||||
|
Note
|
||||||
|
-----
|
||||||
|
需同时 patch 三处引用:
|
||||||
|
* ``pyflowx.executors.run`` —— 实际实现
|
||||||
|
* ``pyflowx.runner.run`` —— ``CliRunner`` 直接 import 的引用
|
||||||
|
* ``pyflowx.run`` —— 顶层包导出的引用(用户脚本常用 ``px.run()``)
|
||||||
|
|
||||||
|
另外 patch ``RunReport.__init__`` 以捕获 ``run()`` 内部创建的 report 实例。
|
||||||
|
这对于 ``run()`` 抛出 ``TaskFailedError`` 的场景至关重要:此时 ``run()``
|
||||||
|
不会正常返回 report,但 report 对象已在内部创建并填充了已执行任务的结果。
|
||||||
|
通过 ``capture_enabled`` 标志确保只在 ``patched_run`` 调用期间捕获。
|
||||||
|
"""
|
||||||
|
captured: dict[str, Any] = {}
|
||||||
|
original_exec_run = _executors.run
|
||||||
|
original_runner_run = _runner.run
|
||||||
|
# 惰性获取顶层 pyflowx.run 引用(避免循环导入)
|
||||||
|
import pyflowx as px_mod
|
||||||
|
|
||||||
|
original_px_run = px_mod.run
|
||||||
|
original_report_init = RunReport.__init__
|
||||||
|
capture_enabled = [False]
|
||||||
|
|
||||||
|
def patched_report_init(self: RunReport, *args: Any, **kwargs: Any) -> None:
|
||||||
|
original_report_init(self, *args, **kwargs)
|
||||||
|
if capture_enabled[0]:
|
||||||
|
captured["report"] = self
|
||||||
|
|
||||||
|
RunReport.__init__ = patched_report_init # type: ignore[assignment]
|
||||||
|
|
||||||
|
def patched_run(graph: Any, *args: Any, **kwargs: Any) -> RunReport:
|
||||||
|
captured["graph"] = graph
|
||||||
|
capture_enabled[0] = True
|
||||||
|
try:
|
||||||
|
report = original_exec_run(graph, *args, **kwargs)
|
||||||
|
# 正常返回时确保 captured["report"] 是返回的 report
|
||||||
|
captured["report"] = report
|
||||||
|
return report
|
||||||
|
finally:
|
||||||
|
capture_enabled[0] = False
|
||||||
|
|
||||||
|
# patch 所有引用 run 的入口
|
||||||
|
_executors.run = patched_run # type: ignore[assignment]
|
||||||
|
_runner.run = patched_run # type: ignore[assignment]
|
||||||
|
px_mod.run = patched_run # type: ignore[assignment]
|
||||||
|
|
||||||
|
def _restore() -> None:
|
||||||
|
_executors.run = original_exec_run # type: ignore[assignment]
|
||||||
|
_runner.run = original_runner_run # type: ignore[assignment]
|
||||||
|
px_mod.run = original_px_run # type: ignore[assignment]
|
||||||
|
RunReport.__init__ = original_report_init # type: ignore[assignment]
|
||||||
|
|
||||||
|
captured["_restore"] = _restore
|
||||||
|
return captured
|
||||||
|
|
||||||
|
|
||||||
|
def _run_target_script(script: Path, script_args: list[str]) -> dict[str, Any]:
|
||||||
|
"""执行目标脚本。
|
||||||
|
|
||||||
|
将脚本所在目录加入 ``sys.path``,设置 ``sys.argv``,然后用
|
||||||
|
``runpy.run_path`` 以 ``__main__`` 身份执行。捕获 ``SystemExit``。
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
dict[str, Any]
|
||||||
|
脚本模块的全局变量字典(含 ``main`` 等定义)。
|
||||||
|
"""
|
||||||
|
sys.argv = [str(script), *script_args]
|
||||||
|
script_dir = str(script.parent.resolve())
|
||||||
|
if script_dir not in sys.path:
|
||||||
|
sys.path.insert(0, script_dir)
|
||||||
|
return runpy.run_path(str(script), run_name="__main__")
|
||||||
|
|
||||||
|
|
||||||
|
def _try_call_main(module_globals: dict[str, Any]) -> None:
|
||||||
|
"""若模块定义了 ``main`` 可调用对象,调用它。
|
||||||
|
|
||||||
|
用于脚本无 ``if __name__ == "__main__"`` 块的场景(如通过 entry points
|
||||||
|
注册的 CLI 工具脚本)。``main`` 通常调用 ``CliRunner.run_cli()``,
|
||||||
|
后者读取 ``sys.argv[1:]`` 执行对应命令。
|
||||||
|
"""
|
||||||
|
main_fn = module_globals.get("main")
|
||||||
|
if callable(main_fn):
|
||||||
|
main_fn()
|
||||||
|
|
||||||
|
|
||||||
|
def _output_report(
|
||||||
|
profile: ProfileReport,
|
||||||
|
export: str,
|
||||||
|
output: str | None,
|
||||||
|
script_stem: str,
|
||||||
|
no_browser: bool,
|
||||||
|
) -> None:
|
||||||
|
"""输出性能报告。"""
|
||||||
|
if export == "text":
|
||||||
|
print(profile.describe())
|
||||||
|
return
|
||||||
|
|
||||||
|
# HTML 格式
|
||||||
|
html = profile.to_html()
|
||||||
|
if output:
|
||||||
|
out_path = Path(output)
|
||||||
|
else:
|
||||||
|
out_path = Path.cwd() / f"{script_stem}_profile.html"
|
||||||
|
out_path.write_text(html, encoding="utf-8")
|
||||||
|
print(f"HTML 报告已生成: {out_path}")
|
||||||
|
|
||||||
|
if not no_browser:
|
||||||
|
try:
|
||||||
|
webbrowser.open(f"file://{out_path.resolve()}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"警告:无法打开浏览器: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""pxp CLI 入口。"""
|
||||||
|
parser = _build_parser()
|
||||||
|
pxp_args, remaining = parser.parse_known_args()
|
||||||
|
|
||||||
|
if not remaining:
|
||||||
|
parser.print_help()
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
script_str = remaining[0]
|
||||||
|
script_args = remaining[1:]
|
||||||
|
script_path = Path(script_str).resolve()
|
||||||
|
|
||||||
|
if not script_path.is_file():
|
||||||
|
print(f"错误:脚本不存在: {script_path}", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
# 注入 hook
|
||||||
|
captured = _capture_px_run()
|
||||||
|
|
||||||
|
# 执行目标脚本
|
||||||
|
print(f"正在分析: {script_path}")
|
||||||
|
if script_args:
|
||||||
|
print(f"脚本参数: {script_args}")
|
||||||
|
print("-" * 60)
|
||||||
|
|
||||||
|
module_globals: dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
module_globals = _run_target_script(script_path, script_args)
|
||||||
|
except SystemExit:
|
||||||
|
# 脚本调用了 sys.exit,正常情况
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"警告:脚本执行抛出异常: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
# 若脚本执行未捕获到 run(),尝试调用模块的 main() 函数
|
||||||
|
# (适用于无 ``if __name__ == "__main__"`` 块的 CLI 脚本)
|
||||||
|
if captured.get("report") is None and module_globals:
|
||||||
|
try:
|
||||||
|
_try_call_main(module_globals)
|
||||||
|
except SystemExit:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"警告:调用 main() 抛出异常: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
# 还原 hook
|
||||||
|
restore = captured.pop("_restore", None)
|
||||||
|
if restore is not None:
|
||||||
|
restore()
|
||||||
|
|
||||||
|
# 检查是否捕获到 run() 调用
|
||||||
|
report = captured.get("report")
|
||||||
|
graph = captured.get("graph")
|
||||||
|
if report is None or graph is None:
|
||||||
|
print("错误:未捕获到 px.run() 调用,无法生成性能报告", file=sys.stderr)
|
||||||
|
print("请确保脚本通过 px.run() 或 CliRunner 执行任务流图。", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# 生成报告
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
_output_report(
|
||||||
|
profile,
|
||||||
|
export=pxp_args.export,
|
||||||
|
output=pxp_args.output,
|
||||||
|
script_stem=script_path.stem,
|
||||||
|
no_browser=pxp_args.no_browser,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
+24
-15
@@ -6,39 +6,48 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import pyflowx as px
|
import pyflowx as px
|
||||||
from pyflowx.conditions import Constants
|
from pyflowx.conditions import Constants
|
||||||
|
|
||||||
|
# 项目根目录(pymake.py 在 src/pyflowx/cli,向上四层到达根目录)
|
||||||
|
ROOT_DIR = Path(__file__).parent.parent.parent.parent
|
||||||
|
|
||||||
MATURIN_BUILD_COMMAND = ["maturin", "build", "-r"]
|
MATURIN_BUILD_COMMAND = ["maturin", "build", "-r"]
|
||||||
if Constants.IS_WINDOWS:
|
if Constants.IS_WINDOWS:
|
||||||
MATURIN_BUILD_COMMAND.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"])
|
MATURIN_BUILD_COMMAND.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"])
|
||||||
|
|
||||||
# 扁平注册所有任务(px.cmd 自动从命令前两段推导 name)
|
# 扁平注册所有任务(px.cmd 自动从命令前两段推导 name)
|
||||||
|
# 所有任务指定 cwd=ROOT_DIR,确保在项目根目录执行
|
||||||
tasks: list[px.TaskSpec] = [
|
tasks: list[px.TaskSpec] = [
|
||||||
px.cmd(["uv", "build"]),
|
px.cmd(["uv", "build"], cwd=ROOT_DIR),
|
||||||
px.cmd(MATURIN_BUILD_COMMAND),
|
px.cmd(MATURIN_BUILD_COMMAND, cwd=ROOT_DIR),
|
||||||
px.cmd(["uv", "sync"]),
|
px.cmd(["uv", "sync"], cwd=ROOT_DIR),
|
||||||
px.cmd(["gitt", "c"], name="git_clean"),
|
px.cmd(["gitt", "c"], name="git_clean", cwd=ROOT_DIR),
|
||||||
px.cmd(
|
px.cmd(
|
||||||
["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
||||||
name="test",
|
name="test",
|
||||||
|
cwd=ROOT_DIR,
|
||||||
),
|
),
|
||||||
px.cmd(
|
px.cmd(
|
||||||
["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
||||||
name="test_fast",
|
name="test_fast",
|
||||||
|
cwd=ROOT_DIR,
|
||||||
),
|
),
|
||||||
px.cmd(
|
px.cmd(
|
||||||
["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
|
["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
|
||||||
name="test_coverage",
|
name="test_coverage",
|
||||||
|
cwd=ROOT_DIR,
|
||||||
),
|
),
|
||||||
px.cmd(["pyrefly", "check", "."]),
|
px.cmd(["pyrefly", "check", "."], cwd=ROOT_DIR),
|
||||||
px.cmd(["git", "add", "-A"], name="git_add_all"),
|
px.cmd(["git", "add", "-A"], name="git_add_all", cwd=ROOT_DIR),
|
||||||
px.cmd(["bumpversion"]),
|
px.cmd(["bumpversion"], cwd=ROOT_DIR),
|
||||||
px.cmd(["bumpversion", "minor"]),
|
px.cmd(["bumpversion", "minor"], cwd=ROOT_DIR),
|
||||||
px.cmd(["git", "push"]),
|
px.cmd(["git", "push"], cwd=ROOT_DIR),
|
||||||
px.cmd(["git", "push", "--tags"], name="git_push_tags"),
|
px.cmd(["git", "push", "--tags"], name="git_push_tags", cwd=ROOT_DIR),
|
||||||
px.cmd(["hatch", "publish"], name="publish_python"),
|
px.cmd(["hatch", "publish"], name="publish_python", cwd=ROOT_DIR),
|
||||||
px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish"),
|
px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish", cwd=ROOT_DIR),
|
||||||
]
|
]
|
||||||
|
|
||||||
# 单任务别名(alias 名与任务名相同):直接内联 TaskSpec,避免 str 自引用
|
# 单任务别名(alias 名与任务名相同):直接内联 TaskSpec,避免 str 自引用
|
||||||
@@ -55,13 +64,13 @@ aliases: dict[str, str | list[str | px.TaskSpec] | px.TaskSpec | px.Graph] = {
|
|||||||
"bump": ["c", "tc", "git_add_all", "bumpversion"],
|
"bump": ["c", "tc", "git_add_all", "bumpversion"],
|
||||||
"bumpmi": "bumpversion_minor",
|
"bumpmi": "bumpversion_minor",
|
||||||
"cov": ["git_clean", "test_coverage"],
|
"cov": ["git_clean", "test_coverage"],
|
||||||
"doc": px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc"),
|
"doc": px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc", cwd=ROOT_DIR),
|
||||||
"lint": px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint"),
|
"lint": px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint", cwd=ROOT_DIR),
|
||||||
"pb": ["twine_publish", "publish_python"],
|
"pb": ["twine_publish", "publish_python"],
|
||||||
"t": "test",
|
"t": "test",
|
||||||
"tf": "test_fast",
|
"tf": "test_fast",
|
||||||
"tc": ["pyrefly_check", "lint"],
|
"tc": ["pyrefly_check", "lint"],
|
||||||
"tox": px.cmd(["tox", "-p", "auto"], name="tox"),
|
"tox": px.cmd(["tox", "-p", "auto"], name="tox", cwd=ROOT_DIR),
|
||||||
# 发布命令
|
# 发布命令
|
||||||
"p": ["git_clean", "git_push", "git_push_tags"],
|
"p": ["git_clean", "git_push", "git_push_tags"],
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,705 @@
|
|||||||
|
"""工作流执行性能评估。
|
||||||
|
|
||||||
|
基于 :class:`~pyflowx.report.RunReport` 中已有的 ``started_at`` /
|
||||||
|
``finished_at`` 时间戳进行离线分析,**零运行时开销**——不修改执行流程,
|
||||||
|
不注册回调,不引入额外计时器。
|
||||||
|
|
||||||
|
核心指标
|
||||||
|
--------
|
||||||
|
* **任务级**:每个任务的 wall-clock 耗时、状态、重试次数、等待时间
|
||||||
|
(从最早依赖完成到本任务开始)。
|
||||||
|
* **图级**:总耗时(wall-clock)、关键路径耗时(理论最短耗时)、
|
||||||
|
并行度效率(关键路径耗时 / 总耗时)。
|
||||||
|
* **关键路径**:从源点到汇点的最长依赖路径,识别真正的串行瓶颈。
|
||||||
|
* **并行度**:基于时间线重叠计算瞬时并行度,给出平均并行度与峰值并行度。
|
||||||
|
* **瓶颈识别**:按耗时排序的 Top-N 任务。
|
||||||
|
|
||||||
|
设计原则
|
||||||
|
--------
|
||||||
|
* 数据来源于 ``RunReport`` + ``Graph``,无副作用。
|
||||||
|
* 计算复杂度 O(V+E):拓扑排序 + 单次松弛,适合大规模图。
|
||||||
|
* 所有时间戳用 ``datetime``,与 :class:`TaskResult` 保持一致。
|
||||||
|
|
||||||
|
快速上手
|
||||||
|
--------
|
||||||
|
import pyflowx as px
|
||||||
|
|
||||||
|
report = px.run(graph)
|
||||||
|
profile = px.ProfileReport.from_report(report, graph)
|
||||||
|
print(profile.describe())
|
||||||
|
bottlenecks = profile.top_bottlenecks(3)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ProfileReport",
|
||||||
|
"TaskProfile",
|
||||||
|
]
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from .graph import Graph
|
||||||
|
from .report import RunReport
|
||||||
|
from .task import TaskResult, TaskStatus
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class TaskProfile:
|
||||||
|
"""单个任务的性能剖面。
|
||||||
|
|
||||||
|
属性
|
||||||
|
----
|
||||||
|
name:
|
||||||
|
任务名。
|
||||||
|
status:
|
||||||
|
终态(SUCCESS/FAILED/SKIPPED)。
|
||||||
|
duration:
|
||||||
|
wall-clock 执行耗时(秒)。SKIPPED 任务为 0.0。
|
||||||
|
attempts:
|
||||||
|
尝试次数(含首次)。
|
||||||
|
wait_time:
|
||||||
|
从最早硬依赖完成到本任务开始的等待时间(秒)。
|
||||||
|
无硬依赖或 SKIPPED 时为 0.0。
|
||||||
|
is_on_critical_path:
|
||||||
|
是否位于关键路径上。
|
||||||
|
deps:
|
||||||
|
硬依赖任务名列表。
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
status: TaskStatus
|
||||||
|
duration: float
|
||||||
|
attempts: int
|
||||||
|
wait_time: float
|
||||||
|
is_on_critical_path: bool
|
||||||
|
deps: tuple[str, ...]
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
"""转为 JSON 友好的字典。"""
|
||||||
|
return {
|
||||||
|
"name": self.name,
|
||||||
|
"status": self.status.value,
|
||||||
|
"duration_seconds": round(self.duration, 6),
|
||||||
|
"attempts": self.attempts,
|
||||||
|
"wait_time_seconds": round(self.wait_time, 6),
|
||||||
|
"is_on_critical_path": self.is_on_critical_path,
|
||||||
|
"deps": list(self.deps),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ProfileReport:
|
||||||
|
"""工作流执行的性能剖面报告。
|
||||||
|
|
||||||
|
通过 :meth:`from_report` 从 :class:`RunReport` + :class:`Graph` 构建。
|
||||||
|
所有字段在构造时一次性计算完毕,后续访问为 O(1)。
|
||||||
|
"""
|
||||||
|
|
||||||
|
tasks: tuple[TaskProfile, ...]
|
||||||
|
"""所有任务的性能剖面(按拓扑序)。"""
|
||||||
|
|
||||||
|
total_duration: float
|
||||||
|
"""整次运行的 wall-clock 耗时(秒)。"""
|
||||||
|
|
||||||
|
critical_path_duration: float
|
||||||
|
"""关键路径耗时(秒):从最早任务开始到最晚任务结束的最长依赖路径。"""
|
||||||
|
|
||||||
|
critical_path: tuple[str, ...]
|
||||||
|
"""关键路径上的任务名序列(按执行顺序)。"""
|
||||||
|
|
||||||
|
avg_parallelism: float
|
||||||
|
"""平均并行度 = 任务总耗时 / wall-clock 总耗时。"""
|
||||||
|
|
||||||
|
peak_parallelism: int
|
||||||
|
"""峰值并行度:任一时刻同时运行的任务数最大值。"""
|
||||||
|
|
||||||
|
parallelism_efficiency: float
|
||||||
|
"""并行度效率 = 关键路径耗时 / wall-clock 总耗时。``1.0`` 表示完全串行,
|
||||||
|
越大表示并行化收益越低(瓶颈在关键路径上)。"""
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# 构建
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
@classmethod
|
||||||
|
def from_report(cls, report: RunReport, graph: Graph) -> ProfileReport:
|
||||||
|
"""从运行报告与图构建性能剖面。
|
||||||
|
|
||||||
|
参数
|
||||||
|
----
|
||||||
|
report:
|
||||||
|
已完成的 :class:`RunReport`,需包含 ``started_at``/``finished_at``。
|
||||||
|
graph:
|
||||||
|
对应的 :class:`Graph`,用于依赖关系与关键路径分析。
|
||||||
|
|
||||||
|
Note
|
||||||
|
-----
|
||||||
|
本方法不修改 ``report`` 或 ``graph``,纯函数式计算。
|
||||||
|
"""
|
||||||
|
task_profiles = cls._build_task_profiles(report, graph)
|
||||||
|
total_duration = cls._calc_total_duration(report)
|
||||||
|
critical_path, critical_duration = cls._calc_critical_path(graph, report)
|
||||||
|
avg_par, peak_par = cls._calc_parallelism(report)
|
||||||
|
efficiency = critical_duration / total_duration if total_duration > 0 else 0.0
|
||||||
|
|
||||||
|
# 标记关键路径上的任务
|
||||||
|
critical_set = set(critical_path)
|
||||||
|
marked = tuple(
|
||||||
|
TaskProfile(
|
||||||
|
name=t.name,
|
||||||
|
status=t.status,
|
||||||
|
duration=t.duration,
|
||||||
|
attempts=t.attempts,
|
||||||
|
wait_time=t.wait_time,
|
||||||
|
is_on_critical_path=t.name in critical_set,
|
||||||
|
deps=t.deps,
|
||||||
|
)
|
||||||
|
for t in task_profiles
|
||||||
|
)
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
tasks=marked,
|
||||||
|
total_duration=total_duration,
|
||||||
|
critical_path_duration=critical_duration,
|
||||||
|
critical_path=critical_path,
|
||||||
|
avg_parallelism=avg_par,
|
||||||
|
peak_parallelism=peak_par,
|
||||||
|
parallelism_efficiency=efficiency,
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_task_profiles(report: RunReport, graph: Graph) -> tuple[TaskProfile, ...]:
|
||||||
|
"""构建每个任务的性能剖面。"""
|
||||||
|
profiles: list[TaskProfile] = []
|
||||||
|
for name, result in report.results.items():
|
||||||
|
spec = graph.specs.get(name)
|
||||||
|
deps = tuple(spec.depends_on) if spec is not None else ()
|
||||||
|
duration = result.duration or 0.0
|
||||||
|
wait_time = ProfileReport._calc_wait_time(result, deps, report)
|
||||||
|
profiles.append(
|
||||||
|
TaskProfile(
|
||||||
|
name=name,
|
||||||
|
status=result.status,
|
||||||
|
duration=duration,
|
||||||
|
attempts=result.attempts,
|
||||||
|
wait_time=wait_time,
|
||||||
|
is_on_critical_path=False, # 后续标记
|
||||||
|
deps=deps,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return tuple(profiles)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calc_wait_time(
|
||||||
|
result: TaskResult[Any],
|
||||||
|
deps: tuple[str, ...],
|
||||||
|
report: RunReport,
|
||||||
|
) -> float:
|
||||||
|
"""计算等待时间:从最早依赖完成到本任务开始。
|
||||||
|
|
||||||
|
无硬依赖、SKIPPED 任务或时间戳缺失时返回 0.0。
|
||||||
|
"""
|
||||||
|
if not deps or result.started_at is None or result.status == TaskStatus.SKIPPED:
|
||||||
|
return 0.0
|
||||||
|
# 找出所有已完成依赖的最晚完成时间
|
||||||
|
dep_end_times: list[datetime] = []
|
||||||
|
for dep in deps:
|
||||||
|
dep_result = report.results.get(dep)
|
||||||
|
if dep_result is not None and dep_result.finished_at is not None:
|
||||||
|
dep_end_times.append(dep_result.finished_at)
|
||||||
|
if not dep_end_times:
|
||||||
|
return 0.0
|
||||||
|
latest_dep_end = max(dep_end_times)
|
||||||
|
delta = (result.started_at - latest_dep_end).total_seconds()
|
||||||
|
return max(0.0, delta)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calc_total_duration(report: RunReport) -> float:
|
||||||
|
"""计算 wall-clock 总耗时:最早开始到最晚结束。"""
|
||||||
|
starts: list[datetime] = []
|
||||||
|
ends: list[datetime] = []
|
||||||
|
for r in report.results.values():
|
||||||
|
if r.started_at is not None:
|
||||||
|
starts.append(r.started_at)
|
||||||
|
if r.finished_at is not None:
|
||||||
|
ends.append(r.finished_at)
|
||||||
|
if not starts or not ends:
|
||||||
|
return 0.0
|
||||||
|
return (max(ends) - min(starts)).total_seconds()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calc_critical_path(graph: Graph, report: RunReport) -> tuple[tuple[str, ...], float]:
|
||||||
|
"""计算关键路径:DAG 最长路径(按实际执行耗时)。
|
||||||
|
|
||||||
|
使用拓扑排序 + 动态规划,O(V+E)。SKIPPED 任务耗时按 0 计。
|
||||||
|
"""
|
||||||
|
# 构建耗时映射
|
||||||
|
durations: dict[str, float] = {}
|
||||||
|
for name, result in report.results.items():
|
||||||
|
durations[name] = result.duration or 0.0
|
||||||
|
|
||||||
|
# 拓扑序(使用 graph.layers 保证与分层一致)
|
||||||
|
try:
|
||||||
|
layers = graph.layers()
|
||||||
|
except Exception:
|
||||||
|
# 图校验失败时回退为空
|
||||||
|
return (), 0.0
|
||||||
|
|
||||||
|
# earliest_finish[name] = duration[name] + max(earliest_finish[dep] for dep in deps)
|
||||||
|
earliest_finish: dict[str, float] = {}
|
||||||
|
predecessor: dict[str, str | None] = {}
|
||||||
|
|
||||||
|
for layer in layers:
|
||||||
|
for name in layer:
|
||||||
|
spec = graph.specs.get(name)
|
||||||
|
deps = spec.depends_on if spec is not None else ()
|
||||||
|
if not deps:
|
||||||
|
earliest_finish[name] = durations.get(name, 0.0)
|
||||||
|
predecessor[name] = None
|
||||||
|
else:
|
||||||
|
best_dep: str | None = None
|
||||||
|
best_ef = 0.0
|
||||||
|
for dep in deps:
|
||||||
|
ef = earliest_finish.get(dep, 0.0)
|
||||||
|
if ef >= best_ef:
|
||||||
|
best_ef = ef
|
||||||
|
best_dep = dep
|
||||||
|
earliest_finish[name] = best_ef + durations.get(name, 0.0)
|
||||||
|
predecessor[name] = best_dep
|
||||||
|
|
||||||
|
if not earliest_finish:
|
||||||
|
return (), 0.0
|
||||||
|
|
||||||
|
# 找到 earliest_finish 最大的节点作为终点
|
||||||
|
end_node = max(earliest_finish, key=lambda n: earliest_finish[n])
|
||||||
|
total = earliest_finish[end_node]
|
||||||
|
|
||||||
|
# 回溯关键路径
|
||||||
|
path: list[str] = []
|
||||||
|
node: str | None = end_node
|
||||||
|
while node is not None:
|
||||||
|
path.append(node)
|
||||||
|
node = predecessor.get(node)
|
||||||
|
path.reverse()
|
||||||
|
|
||||||
|
return tuple(path), total
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calc_parallelism(report: RunReport) -> tuple[float, int]:
|
||||||
|
"""计算平均并行度与峰值并行度。
|
||||||
|
|
||||||
|
基于时间线扫描:将每个任务的 [started_at, finished_at] 区间
|
||||||
|
转为事件点(+1/-1),排序后扫描得到瞬时并行度序列。
|
||||||
|
|
||||||
|
返回 (avg_parallelism, peak_parallelism)。
|
||||||
|
无有效时间戳时返回 (0.0, 0)。
|
||||||
|
"""
|
||||||
|
events: list[tuple[float, int]] = [] # (timestamp, delta)
|
||||||
|
for r in report.results.values():
|
||||||
|
if r.started_at is None or r.finished_at is None:
|
||||||
|
continue
|
||||||
|
if r.status == TaskStatus.SKIPPED:
|
||||||
|
continue
|
||||||
|
start_ts = r.started_at.timestamp()
|
||||||
|
end_ts = r.finished_at.timestamp()
|
||||||
|
if end_ts <= start_ts:
|
||||||
|
continue
|
||||||
|
events.append((start_ts, 1))
|
||||||
|
events.append((end_ts, -1))
|
||||||
|
|
||||||
|
if not events:
|
||||||
|
return 0.0, 0
|
||||||
|
|
||||||
|
# 排序:同一时间点先处理结束(-1)再处理开始(+1),避免虚假峰值
|
||||||
|
events.sort(key=lambda e: (e[0], e[1]))
|
||||||
|
|
||||||
|
current = 0
|
||||||
|
peak = 0
|
||||||
|
# 加权面积用于计算平均并行度
|
||||||
|
area = 0.0
|
||||||
|
prev_ts = events[0][0]
|
||||||
|
for ts, delta in events:
|
||||||
|
if ts > prev_ts:
|
||||||
|
area += current * (ts - prev_ts)
|
||||||
|
current += delta
|
||||||
|
peak = max(peak, current)
|
||||||
|
prev_ts = ts
|
||||||
|
|
||||||
|
total_span = events[-1][0] - events[0][0]
|
||||||
|
avg = area / total_span if total_span > 0 else 0.0
|
||||||
|
return avg, peak
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# 查询
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
def task(self, name: str) -> TaskProfile:
|
||||||
|
"""返回指定任务的剖面。不存在则 ``KeyError``。"""
|
||||||
|
for t in self.tasks:
|
||||||
|
if t.name == name:
|
||||||
|
return t
|
||||||
|
raise KeyError(name)
|
||||||
|
|
||||||
|
def top_bottlenecks(self, n: int = 5) -> tuple[TaskProfile, ...]:
|
||||||
|
"""返回耗时最长的 Top-N 任务(按 duration 降序)。
|
||||||
|
|
||||||
|
参数
|
||||||
|
----
|
||||||
|
n:
|
||||||
|
返回数量。``n <= 0`` 返回空元组。
|
||||||
|
"""
|
||||||
|
if n <= 0:
|
||||||
|
return ()
|
||||||
|
return tuple(sorted(self.tasks, key=lambda t: t.duration, reverse=True)[:n])
|
||||||
|
|
||||||
|
def critical_tasks(self) -> tuple[TaskProfile, ...]:
|
||||||
|
"""返回关键路径上的所有任务(按路径顺序)。"""
|
||||||
|
critical_set = set(self.critical_path)
|
||||||
|
# 保持关键路径顺序
|
||||||
|
order = {name: i for i, name in enumerate(self.critical_path)}
|
||||||
|
return tuple(sorted((t for t in self.tasks if t.name in critical_set), key=lambda t: order[t.name]))
|
||||||
|
|
||||||
|
def failed_tasks(self) -> tuple[TaskProfile, ...]:
|
||||||
|
"""返回 FAILED 状态的任务。"""
|
||||||
|
return tuple(t for t in self.tasks if t.status == TaskStatus.FAILED)
|
||||||
|
|
||||||
|
def skipped_tasks(self) -> tuple[TaskProfile, ...]:
|
||||||
|
"""返回 SKIPPED 状态的任务。"""
|
||||||
|
return tuple(t for t in self.tasks if t.status == TaskStatus.SKIPPED)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# 输出
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
"""转为 JSON 友好的字典。"""
|
||||||
|
return {
|
||||||
|
"tasks": [t.to_dict() for t in self.tasks],
|
||||||
|
"total_duration_seconds": round(self.total_duration, 6),
|
||||||
|
"critical_path_duration_seconds": round(self.critical_path_duration, 6),
|
||||||
|
"critical_path": list(self.critical_path),
|
||||||
|
"avg_parallelism": round(self.avg_parallelism, 4),
|
||||||
|
"peak_parallelism": self.peak_parallelism,
|
||||||
|
"parallelism_efficiency": round(self.parallelism_efficiency, 4),
|
||||||
|
"bottlenecks": [t.to_dict() for t in self.top_bottlenecks(5)],
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_html(self) -> str:
|
||||||
|
"""生成自包含的 HTML 报告(含 CSS,无外部依赖)。
|
||||||
|
|
||||||
|
报告含:图级指标卡片、关键路径、时间线甘特图、Top 瓶颈表格、
|
||||||
|
全部任务表格。适合直接用浏览器打开查看。
|
||||||
|
"""
|
||||||
|
return _render_html(self)
|
||||||
|
|
||||||
|
def describe(self) -> str:
|
||||||
|
lines: list[str] = []
|
||||||
|
lines.append("=" * 70)
|
||||||
|
lines.append("PyFlowX 性能剖面报告")
|
||||||
|
lines.append("=" * 70)
|
||||||
|
lines.append("")
|
||||||
|
lines.append("【图级指标】")
|
||||||
|
lines.append(f" 总耗时 (wall-clock): {self.total_duration:.3f}s")
|
||||||
|
lines.append(f" 关键路径耗时: {self.critical_path_duration:.3f}s")
|
||||||
|
lines.append(f" 平均并行度: {self.avg_parallelism:.2f}")
|
||||||
|
lines.append(f" 峰值并行度: {self.peak_parallelism}")
|
||||||
|
lines.append(f" 并行度效率: {self.parallelism_efficiency:.2%}")
|
||||||
|
lines.append(f" 任务总数: {len(self.tasks)}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# 关键路径
|
||||||
|
lines.append("【关键路径】")
|
||||||
|
if self.critical_path:
|
||||||
|
lines.append(f" {' -> '.join(self.critical_path)}")
|
||||||
|
else:
|
||||||
|
lines.append(" (无)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Top 瓶颈
|
||||||
|
bottlenecks = self.top_bottlenecks(5)
|
||||||
|
lines.append(f"【Top {len(bottlenecks)} 瓶颈任务】")
|
||||||
|
if bottlenecks:
|
||||||
|
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
|
||||||
|
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
|
||||||
|
for t in bottlenecks:
|
||||||
|
critical_flag = "✓" if t.is_on_critical_path else ""
|
||||||
|
lines.append(
|
||||||
|
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
|
||||||
|
f"{critical_flag:>8} {t.status.value:>8}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
lines.append(" (无)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# 全部任务详情
|
||||||
|
lines.append("【全部任务】")
|
||||||
|
if self.tasks:
|
||||||
|
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
|
||||||
|
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
|
||||||
|
for t in self.tasks:
|
||||||
|
critical_flag = "✓" if t.is_on_critical_path else ""
|
||||||
|
lines.append(
|
||||||
|
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
|
||||||
|
f"{critical_flag:>8} {t.status.value:>8}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
lines.append(" (无)")
|
||||||
|
lines.append("")
|
||||||
|
lines.append("=" * 70)
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return (
|
||||||
|
f"ProfileReport(tasks={len(self.tasks)}, "
|
||||||
|
f"total={self.total_duration:.3f}s, "
|
||||||
|
f"critical={self.critical_path_duration:.3f}s, "
|
||||||
|
f"avg_par={self.avg_parallelism:.2f}, "
|
||||||
|
f"peak_par={self.peak_parallelism})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------- #
|
||||||
|
# HTML 渲染(私有,零依赖)
|
||||||
|
# ---------------------------------------------------------------------- #
|
||||||
|
_HTML_TEMPLATE = """<!DOCTYPE html>
|
||||||
|
<html lang="zh-CN">
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||||
|
<title>PyFlowX 性能剖面报告</title>
|
||||||
|
<style>
|
||||||
|
:root {{
|
||||||
|
--bg: #f5f5f7;
|
||||||
|
--card: #ffffff;
|
||||||
|
--border: #d2d2d7;
|
||||||
|
--text: #1d1d1f;
|
||||||
|
--muted: #6e6e73;
|
||||||
|
--accent: #0071e3;
|
||||||
|
--success: #34c759;
|
||||||
|
--warning: #ff9f0a;
|
||||||
|
--danger: #ff3b30;
|
||||||
|
--critical: #af52de;
|
||||||
|
}}
|
||||||
|
* {{ box-sizing: border-box; }}
|
||||||
|
body {{
|
||||||
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
|
||||||
|
margin: 0;
|
||||||
|
padding: 24px;
|
||||||
|
background: var(--bg);
|
||||||
|
color: var(--text);
|
||||||
|
line-height: 1.5;
|
||||||
|
}}
|
||||||
|
h1 {{ margin: 0 0 8px; font-size: 28px; }}
|
||||||
|
h2 {{ margin: 32px 0 12px; font-size: 20px; border-bottom: 1px solid var(--border); padding-bottom: 6px; }}
|
||||||
|
.subtitle {{ color: var(--muted); margin: 0 0 24px; font-size: 14px; }}
|
||||||
|
.cards {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 12px; margin-bottom: 8px; }}
|
||||||
|
.card {{
|
||||||
|
background: var(--card);
|
||||||
|
border: 1px solid var(--border);
|
||||||
|
border-radius: 10px;
|
||||||
|
padding: 16px;
|
||||||
|
}}
|
||||||
|
.card .label {{ font-size: 12px; color: var(--muted); margin-bottom: 4px; text-transform: uppercase; letter-spacing: 0.5px; }}
|
||||||
|
.card .value {{ font-size: 22px; font-weight: 600; }}
|
||||||
|
.card .unit {{ font-size: 13px; color: var(--muted); margin-left: 2px; }}
|
||||||
|
.critical-path {{
|
||||||
|
background: var(--card);
|
||||||
|
border: 1px solid var(--border);
|
||||||
|
border-left: 4px solid var(--critical);
|
||||||
|
border-radius: 10px;
|
||||||
|
padding: 16px;
|
||||||
|
margin-bottom: 8px;
|
||||||
|
}}
|
||||||
|
.critical-path .label {{ font-size: 12px; color: var(--muted); margin-bottom: 8px; text-transform: uppercase; letter-spacing: 0.5px; }}
|
||||||
|
.critical-path .chain {{ font-family: ui-monospace, "SF Mono", Menlo, monospace; font-size: 13px; word-break: break-all; }}
|
||||||
|
.critical-path .arrow {{ color: var(--critical); margin: 0 6px; font-weight: 600; }}
|
||||||
|
/* 甘特图 */
|
||||||
|
.gantt {{
|
||||||
|
background: var(--card);
|
||||||
|
border: 1px solid var(--border);
|
||||||
|
border-radius: 10px;
|
||||||
|
padding: 16px;
|
||||||
|
overflow-x: auto;
|
||||||
|
}}
|
||||||
|
.gantt-row {{ display: flex; align-items: center; margin-bottom: 6px; min-width: 600px; }}
|
||||||
|
.gantt-label {{ width: 200px; flex-shrink: 0; font-size: 13px; font-family: ui-monospace, monospace; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }}
|
||||||
|
.gantt-track {{ flex: 1; height: 22px; background: #f0f0f3; border-radius: 4px; position: relative; }}
|
||||||
|
.gantt-bar {{ position: absolute; height: 100%; border-radius: 4px; min-width: 2px; }}
|
||||||
|
.gantt-bar.success {{ background: var(--success); }}
|
||||||
|
.gantt-bar.failed {{ background: var(--danger); }}
|
||||||
|
.gantt-bar.skipped {{ background: var(--muted); }}
|
||||||
|
.gantt-bar.critical {{ box-shadow: 0 0 0 2px var(--critical) inset; }}
|
||||||
|
.gantt-bar:hover {{ opacity: 0.85; }}
|
||||||
|
.gantt-tooltip {{ position: absolute; bottom: 100%; left: 50%; transform: translateX(-50%); background: #1d1d1f; color: #fff; padding: 4px 8px; border-radius: 4px; font-size: 11px; white-space: nowrap; opacity: 0; pointer-events: none; transition: opacity 0.15s; }}
|
||||||
|
.gantt-bar:hover .gantt-tooltip {{ opacity: 1; }}
|
||||||
|
/* 表格 */
|
||||||
|
table {{ width: 100%; border-collapse: collapse; background: var(--card); border-radius: 10px; overflow: hidden; border: 1px solid var(--border); }}
|
||||||
|
th, td {{ padding: 10px 12px; text-align: left; font-size: 13px; }}
|
||||||
|
th {{ background: #fafafa; font-weight: 600; color: var(--muted); text-transform: uppercase; font-size: 11px; letter-spacing: 0.5px; }}
|
||||||
|
tbody tr {{ border-top: 1px solid var(--border); }}
|
||||||
|
tbody tr:hover {{ background: #fafafa; }}
|
||||||
|
td.num {{ font-family: ui-monospace, monospace; text-align: right; }}
|
||||||
|
.badge {{ display: inline-block; padding: 2px 8px; border-radius: 10px; font-size: 11px; font-weight: 500; }}
|
||||||
|
.badge.success {{ background: rgba(52,199,89,0.15); color: var(--success); }}
|
||||||
|
.badge.failed {{ background: rgba(255,59,48,0.15); color: var(--danger); }}
|
||||||
|
.badge.skipped {{ background: rgba(110,110,115,0.15); color: var(--muted); }}
|
||||||
|
.star {{ color: var(--critical); font-weight: 700; }}
|
||||||
|
.footer {{ margin-top: 32px; color: var(--muted); font-size: 12px; text-align: center; }}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>PyFlowX 性能剖面报告</h1>
|
||||||
|
<p class="subtitle">由 <code>pxp</code> 生成 · {generated_at}</p>
|
||||||
|
|
||||||
|
<h2>图级指标</h2>
|
||||||
|
<div class="cards">
|
||||||
|
<div class="card"><div class="label">总耗时</div><div class="value">{total_duration:.3f}<span class="unit">s</span></div></div>
|
||||||
|
<div class="card"><div class="label">关键路径耗时</div><div class="value">{critical_duration:.3f}<span class="unit">s</span></div></div>
|
||||||
|
<div class="card"><div class="label">平均并行度</div><div class="value">{avg_par:.2f}</div></div>
|
||||||
|
<div class="card"><div class="label">峰值并行度</div><div class="value">{peak_par}</div></div>
|
||||||
|
<div class="card"><div class="label">并行度效率</div><div class="value">{efficiency:.1f}<span class="unit">%</span></div></div>
|
||||||
|
<div class="card"><div class="label">任务总数</div><div class="value">{task_count}</div></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<h2>关键路径</h2>
|
||||||
|
<div class="critical-path">
|
||||||
|
<div class="label">最长依赖路径(串行瓶颈)</div>
|
||||||
|
<div class="chain">{critical_chain}</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<h2>任务时间线</h2>
|
||||||
|
<div class="gantt">
|
||||||
|
{gantt_rows}
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<h2>Top 瓶颈任务</h2>
|
||||||
|
<table>
|
||||||
|
<thead><tr><th>任务</th><th class="num">耗时</th><th class="num">等待</th><th class="num">尝试</th><th>关键路径</th><th>状态</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{bottleneck_rows}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
<h2>全部任务</h2>
|
||||||
|
<table>
|
||||||
|
<thead><tr><th>任务</th><th class="num">耗时</th><th class="num">等待</th><th class="num">尝试</th><th>关键路径</th><th>状态</th><th>依赖</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{all_task_rows}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
<div class="footer">由 PyFlowX · pxp 生成</div>
|
||||||
|
</body>
|
||||||
|
</html>"""
|
||||||
|
|
||||||
|
|
||||||
|
def _status_badge(status: TaskStatus) -> str:
|
||||||
|
"""生成状态徽章 HTML。"""
|
||||||
|
cls = status.value
|
||||||
|
return f'<span class="badge {cls}">{cls}</span>'
|
||||||
|
|
||||||
|
|
||||||
|
def _format_critical_chain(path: tuple[str, ...]) -> str:
|
||||||
|
"""格式化关键路径为 HTML 链。"""
|
||||||
|
if not path:
|
||||||
|
return '<em style="color:var(--muted)">(无)</em>'
|
||||||
|
arrow = '<span class="arrow">→</span>'
|
||||||
|
return arrow.join(f"<strong>{name}</strong>" for name in path)
|
||||||
|
|
||||||
|
|
||||||
|
def _render_gantt(profile: ProfileReport) -> str:
|
||||||
|
"""渲染甘特图行 HTML。
|
||||||
|
|
||||||
|
每个任务一行:标签 + 时间条。时间条位置基于 wait_time + 依赖关系
|
||||||
|
重建相对开始时间(相对最早任务起点),归一化到 0-100% 宽度。
|
||||||
|
SKIPPED 任务不显示(无时间戳)。
|
||||||
|
"""
|
||||||
|
visible = [t for t in profile.tasks if t.status != TaskStatus.SKIPPED and t.duration > 0]
|
||||||
|
if not visible:
|
||||||
|
return '<div style="color:var(--muted);padding:12px;">(无时间线数据)</div>'
|
||||||
|
|
||||||
|
# 重建相对开始时间:start[name] = max(end[dep]) + wait_time
|
||||||
|
# profile.tasks 已是拓扑序,可直接按序计算
|
||||||
|
start: dict[str, float] = {}
|
||||||
|
end: dict[str, float] = {}
|
||||||
|
for t in profile.tasks:
|
||||||
|
if t.status == TaskStatus.SKIPPED:
|
||||||
|
continue
|
||||||
|
dep_end = 0.0
|
||||||
|
for dep in t.deps:
|
||||||
|
dep_end = max(dep_end, end.get(dep, 0.0))
|
||||||
|
s = dep_end + t.wait_time
|
||||||
|
start[t.name] = s
|
||||||
|
end[t.name] = s + t.duration
|
||||||
|
|
||||||
|
# 归一化:以最早开始时间为 0,最晚结束为 100%
|
||||||
|
min_start = min(start.get(t.name, 0.0) for t in visible)
|
||||||
|
max_end = max(end.get(t.name, 0.0) for t in visible)
|
||||||
|
span = max_end - min_start
|
||||||
|
if span <= 0:
|
||||||
|
span = 1.0
|
||||||
|
|
||||||
|
rows: list[str] = []
|
||||||
|
for t in visible:
|
||||||
|
s = start.get(t.name, 0.0) - min_start
|
||||||
|
left_pct = (s / span) * 100
|
||||||
|
width_pct = (t.duration / span) * 100
|
||||||
|
cls = t.status.value
|
||||||
|
critical_cls = " critical" if t.is_on_critical_path else ""
|
||||||
|
tooltip = f"{t.name}: {t.duration:.3f}s @ +{s:.3f}s ({t.status.value})"
|
||||||
|
rows.append(
|
||||||
|
f' <div class="gantt-row">'
|
||||||
|
f'<div class="gantt-label" title="{t.name}">{t.name}</div>'
|
||||||
|
f'<div class="gantt-track">'
|
||||||
|
f'<div class="gantt-bar {cls}{critical_cls}" style="left:{left_pct:.2f}%;width:{width_pct:.2f}%">'
|
||||||
|
f'<span class="gantt-tooltip">{tooltip}</span>'
|
||||||
|
f"</div></div></div>"
|
||||||
|
)
|
||||||
|
return "\n".join(rows)
|
||||||
|
|
||||||
|
|
||||||
|
def _render_task_row(t: TaskProfile, show_deps: bool = False) -> str:
|
||||||
|
"""渲染任务表格行 HTML。"""
|
||||||
|
star = '<span class="star">★</span>' if t.is_on_critical_path else ""
|
||||||
|
deps = ", ".join(t.deps) if show_deps and t.deps else ""
|
||||||
|
deps_cell = f"<td>{deps}</td>" if show_deps else ""
|
||||||
|
return (
|
||||||
|
f" <tr>"
|
||||||
|
f"<td><code>{t.name}</code></td>"
|
||||||
|
f'<td class="num">{t.duration:.3f}s</td>'
|
||||||
|
f'<td class="num">{t.wait_time:.3f}s</td>'
|
||||||
|
f'<td class="num">{t.attempts}</td>'
|
||||||
|
f"<td>{star}</td>"
|
||||||
|
f"<td>{_status_badge(t.status)}</td>"
|
||||||
|
f"{deps_cell}"
|
||||||
|
f"</tr>"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _render_html(profile: ProfileReport) -> str:
|
||||||
|
"""渲染完整 HTML 报告。"""
|
||||||
|
from datetime import datetime as _dt
|
||||||
|
|
||||||
|
bottlenecks = profile.top_bottlenecks(5)
|
||||||
|
bottleneck_rows = (
|
||||||
|
"\n".join(_render_task_row(t) for t in bottlenecks)
|
||||||
|
or ' <tr><td colspan="6" style="color:var(--muted);">(无)</td></tr>'
|
||||||
|
)
|
||||||
|
all_task_rows = (
|
||||||
|
"\n".join(_render_task_row(t, show_deps=True) for t in profile.tasks)
|
||||||
|
or ' <tr><td colspan="7" style="color:var(--muted);">(无)</td></tr>'
|
||||||
|
)
|
||||||
|
|
||||||
|
return _HTML_TEMPLATE.format(
|
||||||
|
generated_at=_dt.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
total_duration=profile.total_duration,
|
||||||
|
critical_duration=profile.critical_path_duration,
|
||||||
|
avg_par=profile.avg_parallelism,
|
||||||
|
peak_par=profile.peak_parallelism,
|
||||||
|
efficiency=profile.parallelism_efficiency * 100,
|
||||||
|
task_count=len(profile.tasks),
|
||||||
|
critical_chain=_format_critical_chain(profile.critical_path),
|
||||||
|
gantt_rows=_render_gantt(profile),
|
||||||
|
bottleneck_rows=bottleneck_rows,
|
||||||
|
all_task_rows=all_task_rows,
|
||||||
|
)
|
||||||
@@ -0,0 +1,545 @@
|
|||||||
|
"""pxp 性能分析器测试.
|
||||||
|
|
||||||
|
覆盖策略:
|
||||||
|
* HTML 渲染:to_html() 输出结构正确,含关键章节。
|
||||||
|
* pxp CLI:参数解析、脚本执行、报告生成、浏览器调用、错误处理。
|
||||||
|
* hook 注入:捕获 px.run() 调用,还原原始函数。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import pyflowx as px
|
||||||
|
from pyflowx.cli import profiler
|
||||||
|
from pyflowx.profiling import ProfileReport
|
||||||
|
from pyflowx.report import RunReport
|
||||||
|
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
|
||||||
|
|
||||||
|
|
||||||
|
def _fn() -> int:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
|
||||||
|
return TaskSpec[Any](name, _fn, depends_on=deps)
|
||||||
|
|
||||||
|
|
||||||
|
def _result(
|
||||||
|
name: str,
|
||||||
|
start: datetime,
|
||||||
|
duration: float,
|
||||||
|
*,
|
||||||
|
status: TaskStatus = TaskStatus.SUCCESS,
|
||||||
|
attempts: int = 1,
|
||||||
|
) -> TaskResult[Any]:
|
||||||
|
"""构造带时间戳的 TaskResult."""
|
||||||
|
end = start + timedelta(seconds=duration) if duration > 0 else start
|
||||||
|
return TaskResult[Any](
|
||||||
|
spec=_spec(name),
|
||||||
|
status=status,
|
||||||
|
value=None,
|
||||||
|
attempts=attempts,
|
||||||
|
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||||
|
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_simple_profile() -> ProfileReport:
|
||||||
|
"""构造一个简单的 ProfileReport 用于测试 HTML 输出."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
])
|
||||||
|
return ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
|
||||||
|
class TestToHtml:
|
||||||
|
"""测试 ProfileReport.to_html()."""
|
||||||
|
|
||||||
|
def test_to_html_contains_key_sections(self) -> None:
|
||||||
|
"""HTML 应包含所有关键章节标题。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
assert "<!DOCTYPE html>" in html
|
||||||
|
assert "PyFlowX 性能剖面报告" in html
|
||||||
|
assert "图级指标" in html
|
||||||
|
assert "关键路径" in html
|
||||||
|
assert "任务时间线" in html
|
||||||
|
assert "Top 瓶颈任务" in html
|
||||||
|
assert "全部任务" in html
|
||||||
|
|
||||||
|
def test_to_html_contains_metrics(self) -> None:
|
||||||
|
"""HTML 应包含图级指标数值。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
# 总耗时 3.0s (a=1 + b=2)
|
||||||
|
assert "3.000" in html
|
||||||
|
# 任务名
|
||||||
|
assert "a" in html
|
||||||
|
assert "b" in html
|
||||||
|
|
||||||
|
def test_to_html_contains_critical_path(self) -> None:
|
||||||
|
"""HTML 应包含关键路径任务链。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
# 关键路径是 a -> b
|
||||||
|
assert "<strong>a</strong>" in html
|
||||||
|
assert "<strong>b</strong>" in html
|
||||||
|
|
||||||
|
def test_to_html_contains_gantt_bars(self) -> None:
|
||||||
|
"""HTML 应包含甘特图条。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
assert "gantt-row" in html
|
||||||
|
assert "gantt-bar" in html
|
||||||
|
# 每个非 SKIPPED 任务一个条
|
||||||
|
assert html.count("gantt-bar") >= 2
|
||||||
|
|
||||||
|
def test_to_html_empty_profile(self) -> None:
|
||||||
|
"""空报告的 HTML 应不崩溃。"""
|
||||||
|
report = px.RunReport()
|
||||||
|
graph = px.Graph()
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
assert "PyFlowX 性能剖面报告" in html
|
||||||
|
assert "(无)" in html
|
||||||
|
|
||||||
|
def test_to_html_with_failed_task(self) -> None:
|
||||||
|
"""含 FAILED 任务的 HTML 应包含失败状态徽章。"""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
assert "failed" in html
|
||||||
|
assert "badge" in html
|
||||||
|
|
||||||
|
def test_to_html_with_skipped_task(self) -> None:
|
||||||
|
"""含 SKIPPED 任务的 HTML 不应在甘特图中显示该任务。"""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = TaskResult[Any](
|
||||||
|
spec=_spec("b"),
|
||||||
|
status=TaskStatus.SKIPPED,
|
||||||
|
reason="skip",
|
||||||
|
)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
# SKIPPED 任务的徽章应出现
|
||||||
|
assert "skipped" in html
|
||||||
|
|
||||||
|
def test_to_html_self_contained(self) -> None:
|
||||||
|
"""HTML 应自包含(无外部依赖)。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
html = profile.to_html()
|
||||||
|
|
||||||
|
# 不引用外部资源
|
||||||
|
assert "<link" not in html
|
||||||
|
assert "<script src" not in html
|
||||||
|
|
||||||
|
|
||||||
|
class TestProfilerArgumentParsing:
|
||||||
|
"""测试 pxp CLI 参数解析。"""
|
||||||
|
|
||||||
|
def test_default_export_is_html(self) -> None:
|
||||||
|
"""默认导出格式为 html。"""
|
||||||
|
parser = profiler._build_parser()
|
||||||
|
args, remaining = parser.parse_known_args(["pymake.py"])
|
||||||
|
assert args.export == "html"
|
||||||
|
assert args.no_browser is False
|
||||||
|
assert args.output is None
|
||||||
|
assert remaining == ["pymake.py"]
|
||||||
|
|
||||||
|
def test_export_text(self) -> None:
|
||||||
|
"""-E text 应设置导出格式为 text。"""
|
||||||
|
parser = profiler._build_parser()
|
||||||
|
args, _ = parser.parse_known_args(["-E", "text", "pymake.py"])
|
||||||
|
assert args.export == "text"
|
||||||
|
|
||||||
|
def test_no_browser_flag(self) -> None:
|
||||||
|
"""--no-browser 应设置标志。"""
|
||||||
|
parser = profiler._build_parser()
|
||||||
|
args, _ = parser.parse_known_args(["--no-browser", "pymake.py"])
|
||||||
|
assert args.no_browser is True
|
||||||
|
|
||||||
|
def test_output_option(self) -> None:
|
||||||
|
"""-o 应设置输出路径。"""
|
||||||
|
parser = profiler._build_parser()
|
||||||
|
args, _ = parser.parse_known_args(["-o", "report.html", "pymake.py"])
|
||||||
|
assert args.output == "report.html"
|
||||||
|
|
||||||
|
def test_script_args_separated(self) -> None:
|
||||||
|
"""脚本参数应通过 remaining 分离。"""
|
||||||
|
parser = profiler._build_parser()
|
||||||
|
_, remaining = parser.parse_known_args(["pymake.py", "t", "--quiet"])
|
||||||
|
assert remaining == ["pymake.py", "t", "--quiet"]
|
||||||
|
|
||||||
|
def test_no_args_prints_help(
|
||||||
|
self,
|
||||||
|
capsys: pytest.CaptureFixture[str],
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
"""无参数应打印帮助并以退出码 2 退出。"""
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp"])
|
||||||
|
with pytest.raises(SystemExit) as exc_info:
|
||||||
|
profiler.main()
|
||||||
|
assert exc_info.value.code == 2
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert "usage" in captured.out.lower() or "usage" in captured.err.lower()
|
||||||
|
|
||||||
|
|
||||||
|
class TestCapturePxRun:
|
||||||
|
"""测试 _capture_px_run hook 注入。"""
|
||||||
|
|
||||||
|
def test_capture_captures_run_call(self) -> None:
|
||||||
|
"""hook 应捕获 px.run() 调用的 graph 和 report。"""
|
||||||
|
captured = profiler._capture_px_run()
|
||||||
|
try:
|
||||||
|
graph = px.Graph.from_specs([px.TaskSpec("a", lambda: 1)])
|
||||||
|
px.run(graph, strategy="sequential")
|
||||||
|
assert "graph" in captured
|
||||||
|
assert "report" in captured
|
||||||
|
assert captured["graph"] is graph
|
||||||
|
finally:
|
||||||
|
captured["_restore"]()
|
||||||
|
|
||||||
|
def test_capture_restores_original(self) -> None:
|
||||||
|
"""还原后 px.run 和 RunReport.__init__ 应恢复为原函数。"""
|
||||||
|
original_run = px.run
|
||||||
|
original_init = RunReport.__init__
|
||||||
|
captured = profiler._capture_px_run()
|
||||||
|
# 注入期间 px.run 和 RunReport.__init__ 已被替换
|
||||||
|
assert px.run is not original_run
|
||||||
|
assert RunReport.__init__ is not original_init
|
||||||
|
captured["_restore"]()
|
||||||
|
# 还原后恢复
|
||||||
|
assert px.run is original_run
|
||||||
|
assert RunReport.__init__ is original_init
|
||||||
|
|
||||||
|
def test_capture_via_runner_run(self) -> None:
|
||||||
|
"""hook 应捕获通过 CliRunner 执行的 run() 调用。"""
|
||||||
|
from pyflowx import runner as runner_mod
|
||||||
|
|
||||||
|
captured = profiler._capture_px_run()
|
||||||
|
try:
|
||||||
|
# 验证 runner.run 也被 patch(指向 patched_run)
|
||||||
|
assert runner_mod.run is px.executors.run
|
||||||
|
graph = px.Graph.from_specs([px.TaskSpec("a", lambda: 1)])
|
||||||
|
runner_mod.run(graph, strategy="sequential")
|
||||||
|
assert "report" in captured
|
||||||
|
finally:
|
||||||
|
captured["_restore"]()
|
||||||
|
|
||||||
|
def test_capture_captures_report_on_failure(self) -> None:
|
||||||
|
"""run() 抛出 TaskFailedError 时仍应捕获 report 实例。"""
|
||||||
|
from pyflowx.executors import TaskFailedError
|
||||||
|
|
||||||
|
def failing() -> None:
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
|
||||||
|
graph = px.Graph.from_specs([px.TaskSpec("a", failing)])
|
||||||
|
captured = profiler._capture_px_run()
|
||||||
|
try:
|
||||||
|
with pytest.raises(TaskFailedError):
|
||||||
|
px.run(graph, strategy="sequential")
|
||||||
|
# 即使 run() 抛异常,report 也应被捕获(含已执行任务的结果)
|
||||||
|
assert "report" in captured
|
||||||
|
assert "graph" in captured
|
||||||
|
assert captured["graph"] is graph
|
||||||
|
finally:
|
||||||
|
captured["_restore"]()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunTargetScript:
|
||||||
|
"""测试 _run_target_script。"""
|
||||||
|
|
||||||
|
def test_run_simple_script(self, tmp_path: Path) -> None:
|
||||||
|
"""应能执行简单脚本并返回模块字典。"""
|
||||||
|
script = tmp_path / "simple.py"
|
||||||
|
script.write_text("x = 42\n", encoding="utf-8")
|
||||||
|
|
||||||
|
result = profiler._run_target_script(script, [])
|
||||||
|
assert result["x"] == 42
|
||||||
|
|
||||||
|
def test_run_script_with_sys_exit(self, tmp_path: Path) -> None:
|
||||||
|
"""脚本调用 sys.exit 应抛 SystemExit。"""
|
||||||
|
script = tmp_path / "exit.py"
|
||||||
|
script.write_text("import sys; sys.exit(0)\n", encoding="utf-8")
|
||||||
|
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
profiler._run_target_script(script, [])
|
||||||
|
|
||||||
|
def test_run_script_sets_argv(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""应正确设置 sys.argv。"""
|
||||||
|
script = tmp_path / "argv.py"
|
||||||
|
script.write_text(
|
||||||
|
"import sys\nassert sys.argv[0] == __file__\nassert sys.argv[1:] == ['arg1', 'arg2']\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
profiler._run_target_script(script, ["arg1", "arg2"])
|
||||||
|
|
||||||
|
def test_run_script_adds_dir_to_path(self, tmp_path: Path) -> None:
|
||||||
|
"""脚本所在目录应加入 sys.path。"""
|
||||||
|
script = tmp_path / "pathcheck.py"
|
||||||
|
script.write_text(
|
||||||
|
"import sys, os\nassert os.path.dirname(__file__) in sys.path\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
profiler._run_target_script(script, [])
|
||||||
|
|
||||||
|
|
||||||
|
class TestOutputReport:
|
||||||
|
"""测试 _output_report。"""
|
||||||
|
|
||||||
|
def test_output_text_format(
|
||||||
|
self,
|
||||||
|
capsys: pytest.CaptureFixture[str],
|
||||||
|
) -> None:
|
||||||
|
"""text 格式应打印 describe() 到 stdout。"""
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
profiler._output_report(profile, export="text", output=None, script_stem="test", no_browser=True)
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert "PyFlowX 性能剖面报告" in captured.out
|
||||||
|
assert "图级指标" in captured.out
|
||||||
|
|
||||||
|
def test_output_html_default_filename(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""HTML 默认输出到 <script>_profile.html。"""
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
profiler._output_report(profile, export="html", output=None, script_stem="mymake", no_browser=True)
|
||||||
|
|
||||||
|
out_file = tmp_path / "mymake_profile.html"
|
||||||
|
assert out_file.exists()
|
||||||
|
content = out_file.read_text(encoding="utf-8")
|
||||||
|
assert "PyFlowX 性能剖面报告" in content
|
||||||
|
|
||||||
|
def test_output_html_custom_path(self, tmp_path: Path) -> None:
|
||||||
|
"""HTML 应写入指定路径。"""
|
||||||
|
out_file = tmp_path / "custom.html"
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
profiler._output_report(profile, export="html", output=str(out_file), script_stem="test", no_browser=True)
|
||||||
|
assert out_file.exists()
|
||||||
|
assert "PyFlowX" in out_file.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
def test_output_html_opens_browser(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""no_browser=False 应调用 webbrowser.open。"""
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
opened: list[str] = []
|
||||||
|
monkeypatch.setattr(profiler.webbrowser, "open", opened.append)
|
||||||
|
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
profiler._output_report(profile, export="html", output=None, script_stem="test", no_browser=False)
|
||||||
|
|
||||||
|
assert len(opened) == 1
|
||||||
|
assert opened[0].startswith("file://")
|
||||||
|
|
||||||
|
def test_output_html_no_browser_flag(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""no_browser=True 不应调用 webbrowser.open。"""
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
opened: list[str] = []
|
||||||
|
monkeypatch.setattr(profiler.webbrowser, "open", opened.append)
|
||||||
|
|
||||||
|
profile = _build_simple_profile()
|
||||||
|
profiler._output_report(profile, export="html", output=None, script_stem="test", no_browser=True)
|
||||||
|
|
||||||
|
assert len(opened) == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestProfilerMainIntegration:
|
||||||
|
"""main() 集成测试。"""
|
||||||
|
|
||||||
|
def test_main_analyses_script_with_px_run(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""main() 应分析含 px.run() 的脚本并生成 HTML。"""
|
||||||
|
script = tmp_path / "mytool.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"graph = px.Graph.from_specs([\n"
|
||||||
|
" px.TaskSpec('a', lambda: 1),\n"
|
||||||
|
" px.TaskSpec('b', lambda: 2, depends_on=('a',)),\n"
|
||||||
|
"])\n"
|
||||||
|
"px.run(graph, strategy='sequential')\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
|
||||||
|
|
||||||
|
profiler.main()
|
||||||
|
|
||||||
|
assert out_file.exists()
|
||||||
|
content = out_file.read_text(encoding="utf-8")
|
||||||
|
assert "PyFlowX 性能剖面报告" in content
|
||||||
|
assert "任务时间线" in content
|
||||||
|
|
||||||
|
def test_main_analyses_script_with_clirunner(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""main() 应分析含 CliRunner 的脚本。"""
|
||||||
|
script = tmp_path / "clirunner_tool.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"runner = px.CliRunner(\n"
|
||||||
|
" aliases={'t': px.TaskSpec('t', lambda: 1)},\n"
|
||||||
|
")\n"
|
||||||
|
"runner.run_cli(['t'])\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
|
||||||
|
|
||||||
|
profiler.main()
|
||||||
|
|
||||||
|
assert out_file.exists()
|
||||||
|
content = out_file.read_text(encoding="utf-8")
|
||||||
|
assert "PyFlowX 性能剖面报告" in content
|
||||||
|
|
||||||
|
def test_main_text_export(
|
||||||
|
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
|
||||||
|
) -> None:
|
||||||
|
"""main() -E text 应输出文本到 stdout。"""
|
||||||
|
script = tmp_path / "simple.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"graph = px.Graph.from_specs([px.TaskSpec('a', lambda: 1)])\n"
|
||||||
|
"px.run(graph, strategy='sequential')\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "-E", "text", "--no-browser", str(script)])
|
||||||
|
|
||||||
|
profiler.main()
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert "PyFlowX 性能剖面报告" in captured.out
|
||||||
|
|
||||||
|
def test_main_script_not_exist(
|
||||||
|
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
|
||||||
|
) -> None:
|
||||||
|
"""脚本不存在应以退出码 2 退出。"""
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(tmp_path / "nonexistent.py")])
|
||||||
|
with pytest.raises(SystemExit) as exc_info:
|
||||||
|
profiler.main()
|
||||||
|
assert exc_info.value.code == 2
|
||||||
|
|
||||||
|
def test_main_no_px_run_captured(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""脚本未调用 px.run() 应以退出码 1 退出。"""
|
||||||
|
script = tmp_path / "no_run.py"
|
||||||
|
script.write_text("print('just printing')\n", encoding="utf-8")
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(script)])
|
||||||
|
with pytest.raises(SystemExit) as exc_info:
|
||||||
|
profiler.main()
|
||||||
|
assert exc_info.value.code == 1
|
||||||
|
|
||||||
|
def test_main_passes_script_args(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""应将脚本参数传递给目标脚本。"""
|
||||||
|
script = tmp_path / "argcheck.py"
|
||||||
|
script.write_text(
|
||||||
|
"import sys\n"
|
||||||
|
"assert sys.argv[1:] == ['myarg'], f'got {sys.argv[1:]}'\n"
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script), "myarg"])
|
||||||
|
|
||||||
|
profiler.main() # 不抛异常即成功
|
||||||
|
|
||||||
|
def test_main_handles_script_exception(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""脚本抛异常时应捕获并继续生成报告(如果有 report)。"""
|
||||||
|
script = tmp_path / "raise.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n"
|
||||||
|
"raise RuntimeError('after run')\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
|
||||||
|
|
||||||
|
profiler.main() # 不抛异常即成功
|
||||||
|
assert out_file.exists()
|
||||||
|
|
||||||
|
def test_main_auto_calls_main_when_no_main_block(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""脚本无 __main__ 块但定义了 main() 时应自动调用。"""
|
||||||
|
script = tmp_path / "no_main_block.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"def main():\n"
|
||||||
|
" px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n"
|
||||||
|
"# 无 if __name__ == '__main__' 块\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
|
||||||
|
|
||||||
|
profiler.main()
|
||||||
|
assert out_file.exists()
|
||||||
|
|
||||||
|
def test_main_auto_calls_main_with_clirunner(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""脚本无 __main__ 块但定义了调用 CliRunner 的 main() 时应自动调用。"""
|
||||||
|
script = tmp_path / "cli_tool.py"
|
||||||
|
script.write_text(
|
||||||
|
"import pyflowx as px\n"
|
||||||
|
"def main():\n"
|
||||||
|
" runner = px.CliRunner(\n"
|
||||||
|
" aliases={'t': px.TaskSpec('t', lambda: 1)},\n"
|
||||||
|
" )\n"
|
||||||
|
" runner.run_cli(['t'])\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
out_file = tmp_path / "report.html"
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script), "t"])
|
||||||
|
|
||||||
|
profiler.main()
|
||||||
|
assert out_file.exists()
|
||||||
|
content = out_file.read_text(encoding="utf-8")
|
||||||
|
assert "PyFlowX 性能剖面报告" in content
|
||||||
|
|
||||||
|
def test_main_no_main_function_exits_with_1(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""脚本无 main() 且未调用 px.run() 应以退出码 1 退出。"""
|
||||||
|
script = tmp_path / "no_main.py"
|
||||||
|
script.write_text("x = 1\n", encoding="utf-8")
|
||||||
|
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(script)])
|
||||||
|
with pytest.raises(SystemExit) as exc_info:
|
||||||
|
profiler.main()
|
||||||
|
assert exc_info.value.code == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestTryCallMain:
|
||||||
|
"""测试 _try_call_main。"""
|
||||||
|
|
||||||
|
def test_calls_main_when_present(self) -> None:
|
||||||
|
"""模块字典含 main 可调用对象时应调用它。"""
|
||||||
|
called: list[bool] = []
|
||||||
|
|
||||||
|
def fake_main() -> None:
|
||||||
|
called.append(True)
|
||||||
|
|
||||||
|
profiler._try_call_main({"main": fake_main})
|
||||||
|
assert called == [True]
|
||||||
|
|
||||||
|
def test_no_main_does_nothing(self) -> None:
|
||||||
|
"""模块字典不含 main 时不应报错。"""
|
||||||
|
profiler._try_call_main({}) # 不抛异常即成功
|
||||||
|
|
||||||
|
def test_non_callable_main_does_nothing(self) -> None:
|
||||||
|
"""main 不是可调用对象时不应报错。"""
|
||||||
|
profiler._try_call_main({"main": "not a function"}) # 不抛异常即成功
|
||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -123,6 +124,22 @@ class TestTaskSpecDefinitions:
|
|||||||
assert spec.cmd == ["tox", "-p", "auto"]
|
assert spec.cmd == ["tox", "-p", "auto"]
|
||||||
assert spec.skip_if_missing is False
|
assert spec.skip_if_missing is False
|
||||||
|
|
||||||
|
def test_all_tasks_have_correct_cwd(self) -> None:
|
||||||
|
"""所有任务应该有正确的 cwd 设置(指向项目根目录)."""
|
||||||
|
# 验证 ROOT_DIR 定义正确(向上三层到达项目根目录)
|
||||||
|
expected_root = Path(__file__).parent.parent.parent
|
||||||
|
assert expected_root == pymake.ROOT_DIR
|
||||||
|
|
||||||
|
# 验证 tasks 中的所有命令任务都有正确的 cwd
|
||||||
|
for spec in pymake.tasks:
|
||||||
|
if spec.cmd is not None:
|
||||||
|
assert spec.cwd == pymake.ROOT_DIR, f"任务 {spec.name} 的 cwd 应为 {pymake.ROOT_DIR}"
|
||||||
|
|
||||||
|
# 验证 aliases 中的内联任务(doc/lint/tox)也有正确的 cwd
|
||||||
|
for name in ("doc", "lint", "tox"):
|
||||||
|
spec = _find_task(name)
|
||||||
|
assert spec.cwd == pymake.ROOT_DIR, f"任务 {name} 的 cwd 应为 {pymake.ROOT_DIR}"
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------- #
|
# ---------------------------------------------------------------------- #
|
||||||
# main function
|
# main function
|
||||||
|
|||||||
@@ -0,0 +1,574 @@
|
|||||||
|
"""性能剖面(ProfileReport)测试.
|
||||||
|
|
||||||
|
覆盖策略:
|
||||||
|
* 构造带时间戳的 RunReport + Graph,验证关键路径、并行度、瓶颈排序。
|
||||||
|
* 边界场景:空报告、单任务、无时间戳、SKIPPED 任务、图校验失败。
|
||||||
|
* 输出格式:to_dict / describe / top_bottlenecks / critical_tasks。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pyflowx as px
|
||||||
|
from pyflowx.profiling import ProfileReport, TaskProfile
|
||||||
|
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
|
||||||
|
|
||||||
|
|
||||||
|
def _fn() -> int:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
|
||||||
|
return TaskSpec[Any](name, _fn, depends_on=deps)
|
||||||
|
|
||||||
|
|
||||||
|
def _result(
|
||||||
|
name: str,
|
||||||
|
start: datetime,
|
||||||
|
duration: float,
|
||||||
|
*,
|
||||||
|
status: TaskStatus = TaskStatus.SUCCESS,
|
||||||
|
attempts: int = 1,
|
||||||
|
) -> TaskResult[Any]:
|
||||||
|
"""构造带时间戳的 TaskResult."""
|
||||||
|
end = start + timedelta(seconds=duration) if duration > 0 else start
|
||||||
|
return TaskResult[Any](
|
||||||
|
spec=_spec(name),
|
||||||
|
status=status,
|
||||||
|
value=None,
|
||||||
|
attempts=attempts,
|
||||||
|
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||||
|
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _skipped_result(name: str, reason: str = "skip") -> TaskResult[Any]:
|
||||||
|
"""构造 SKIPPED 结果(无时间戳)."""
|
||||||
|
return TaskResult[Any](
|
||||||
|
spec=_spec(name),
|
||||||
|
status=TaskStatus.SKIPPED,
|
||||||
|
reason=reason,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestProfileReportConstruction:
|
||||||
|
"""测试 ProfileReport 构建."""
|
||||||
|
|
||||||
|
def test_empty_report(self) -> None:
|
||||||
|
"""空报告应产生空剖面."""
|
||||||
|
report = px.RunReport()
|
||||||
|
graph = px.Graph()
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
assert len(profile.tasks) == 0
|
||||||
|
assert profile.total_duration == 0.0
|
||||||
|
assert profile.critical_path == ()
|
||||||
|
assert profile.critical_path_duration == 0.0
|
||||||
|
assert profile.avg_parallelism == 0.0
|
||||||
|
assert profile.peak_parallelism == 0
|
||||||
|
|
||||||
|
def test_single_task(self) -> None:
|
||||||
|
"""单任务:关键路径就是它自己,并行度为 1."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.5)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert len(profile.tasks) == 1
|
||||||
|
assert profile.tasks[0].name == "a"
|
||||||
|
assert profile.tasks[0].duration == 1.5
|
||||||
|
assert profile.tasks[0].is_on_critical_path
|
||||||
|
assert profile.total_duration == 1.5
|
||||||
|
assert profile.critical_path == ("a",)
|
||||||
|
assert profile.critical_path_duration == 1.5
|
||||||
|
assert profile.avg_parallelism == 1.0
|
||||||
|
assert profile.peak_parallelism == 1
|
||||||
|
assert profile.parallelism_efficiency == 1.0
|
||||||
|
|
||||||
|
def test_serial_chain(self) -> None:
|
||||||
|
"""串行链 a -> b -> c:关键路径为全部,效率 100%."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
|
||||||
|
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.5)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
_spec("c", deps=("b",)),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.total_duration == 4.5
|
||||||
|
assert profile.critical_path_duration == 4.5
|
||||||
|
assert profile.critical_path == ("a", "b", "c")
|
||||||
|
assert profile.parallelism_efficiency == 1.0
|
||||||
|
assert profile.peak_parallelism == 1
|
||||||
|
assert profile.avg_parallelism == 1.0
|
||||||
|
|
||||||
|
def test_parallel_tasks(self) -> None:
|
||||||
|
"""并行任务 a, b 同时执行:关键路径取较长者,效率 < 1."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start, 2.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# wall-clock = 2.0, 关键路径 = 2.0 (b), 效率 = 1.0
|
||||||
|
# 因为关键路径定义就是最长路径,与 wall-clock 相同
|
||||||
|
assert profile.total_duration == 2.0
|
||||||
|
assert profile.critical_path_duration == 2.0
|
||||||
|
assert profile.critical_path == ("b",)
|
||||||
|
assert profile.peak_parallelism == 2
|
||||||
|
# 平均并行度 = (1.0 + 2.0) / 2.0 = 1.5
|
||||||
|
assert profile.avg_parallelism == 1.5
|
||||||
|
|
||||||
|
def test_parallel_with_join(self) -> None:
|
||||||
|
"""a, b 并行后 join 到 c:关键路径 a->c 或 b->c."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start, 3.0)
|
||||||
|
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b"),
|
||||||
|
_spec("c", deps=("a", "b")),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# 关键路径 = b -> c (3 + 1 = 4)
|
||||||
|
assert profile.critical_path_duration == 4.0
|
||||||
|
assert profile.critical_path == ("b", "c")
|
||||||
|
assert profile.tasks[0].is_on_critical_path is False # a 不在关键路径
|
||||||
|
# task("b") 在关键路径上
|
||||||
|
assert profile.task("b").is_on_critical_path
|
||||||
|
assert profile.task("c").is_on_critical_path
|
||||||
|
|
||||||
|
def test_skipped_task_no_timestamp(self) -> None:
|
||||||
|
"""SKIPPED 任务无时间戳:不影响并行度计算."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _skipped_result("b")
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# b 是 SKIPPED,duration=0
|
||||||
|
assert profile.task("b").status == TaskStatus.SKIPPED
|
||||||
|
assert profile.task("b").duration == 0.0
|
||||||
|
assert profile.peak_parallelism == 1 # 只有 a 在跑
|
||||||
|
|
||||||
|
|
||||||
|
class TestWaitTime:
|
||||||
|
"""测试等待时间计算."""
|
||||||
|
|
||||||
|
def test_no_deps_zero_wait(self) -> None:
|
||||||
|
"""无依赖任务等待时间为 0."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.task("a").wait_time == 0.0
|
||||||
|
|
||||||
|
def test_wait_after_dep_completes(self) -> None:
|
||||||
|
"""b 在 a 完成后等待 0.5s 才开始."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1.5), 1.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.task("b").wait_time == 0.5
|
||||||
|
|
||||||
|
def test_wait_negative_clamped_to_zero(self) -> None:
|
||||||
|
"""b 在 a 完成前就开始(异常情况)应钳制为 0."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 2.0)
|
||||||
|
# b 在 a 还没完成时就开始(不应该但可能发生)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1), 1.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# a 在 t=2 结束,b 在 t=1 开始,delta = -1,钳制为 0
|
||||||
|
assert profile.task("b").wait_time == 0.0
|
||||||
|
|
||||||
|
def test_skipped_task_zero_wait(self) -> None:
|
||||||
|
"""SKIPPED 任务等待时间为 0."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _skipped_result("b")
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.task("b").wait_time == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestCriticalPath:
|
||||||
|
"""测试关键路径分析."""
|
||||||
|
|
||||||
|
def test_diamond_dependency(self) -> None:
|
||||||
|
"""菱形依赖:a -> b -> d, a -> c -> d,关键路径取较长分支."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||||||
|
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||||||
|
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
_spec("c", deps=("a",)),
|
||||||
|
_spec("d", deps=("b", "c")),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# 关键路径:a -> b -> d = 1 + 3 + 1 = 5
|
||||||
|
assert profile.critical_path_duration == 5.0
|
||||||
|
assert profile.critical_path == ("a", "b", "d")
|
||||||
|
|
||||||
|
def test_graph_validation_failure_returns_empty(self) -> None:
|
||||||
|
"""图校验失败(有环)应回退为空关键路径."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
# 手动构造带环的图(绕过校验)
|
||||||
|
graph = px.Graph()
|
||||||
|
graph.specs["a"] = _spec("a", deps=("b",))
|
||||||
|
graph.specs["b"] = _spec("b", deps=("a",))
|
||||||
|
graph.deps["a"] = ("b",)
|
||||||
|
graph.deps["b"] = ("a",)
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# layers() 抛 CycleError,回退为空
|
||||||
|
assert profile.critical_path == ()
|
||||||
|
assert profile.critical_path_duration == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestParallelism:
|
||||||
|
"""测试并行度计算."""
|
||||||
|
|
||||||
|
def test_no_timestamps_zero_parallelism(self) -> None:
|
||||||
|
"""所有任务无时间戳:并行度为 0."""
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = TaskResult[Any](spec=_spec("a"), status=TaskStatus.SUCCESS)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.avg_parallelism == 0.0
|
||||||
|
assert profile.peak_parallelism == 0
|
||||||
|
|
||||||
|
def test_zero_duration_excluded(self) -> None:
|
||||||
|
"""零耗时任务(end <= start)不参与并行度计算."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 0.0) # 零耗时
|
||||||
|
report.results["b"] = _result("b", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# 只有 b 参与,峰值 = 1
|
||||||
|
assert profile.peak_parallelism == 1
|
||||||
|
|
||||||
|
def test_skipped_with_timestamps_excluded(self) -> None:
|
||||||
|
"""SKIPPED 任务即使带时间戳也不参与并行度计算."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
# SKIPPED 但带时间戳(异常但可能发生)
|
||||||
|
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.SKIPPED)
|
||||||
|
report.results["b"] = _result("b", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# a 是 SKIPPED,被排除;只有 b 参与
|
||||||
|
assert profile.peak_parallelism == 1
|
||||||
|
|
||||||
|
def test_peak_parallelism_three_tasks(self) -> None:
|
||||||
|
"""三个任务完全重叠:峰值并行度 = 3."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 3.0)
|
||||||
|
report.results["b"] = _result("b", start, 3.0)
|
||||||
|
report.results["c"] = _result("c", start, 3.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.peak_parallelism == 3
|
||||||
|
assert profile.avg_parallelism == 3.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestQueries:
|
||||||
|
"""测试查询方法."""
|
||||||
|
|
||||||
|
def test_task_lookup(self) -> None:
|
||||||
|
"""task(name) 应返回对应剖面."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start, 2.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.task("a").name == "a"
|
||||||
|
assert profile.task("b").duration == 2.0
|
||||||
|
|
||||||
|
def test_task_lookup_not_found(self) -> None:
|
||||||
|
"""task(name) 不存在应抛 KeyError."""
|
||||||
|
report = px.RunReport()
|
||||||
|
graph = px.Graph()
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
try:
|
||||||
|
profile.task("missing")
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise AssertionError("应抛出 KeyError")
|
||||||
|
|
||||||
|
def test_top_bottlenecks(self) -> None:
|
||||||
|
"""top_bottlenecks 应按耗时降序返回."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start, 3.0)
|
||||||
|
report.results["c"] = _result("c", start, 2.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
top3 = profile.top_bottlenecks(3)
|
||||||
|
assert len(top3) == 3
|
||||||
|
assert top3[0].name == "b"
|
||||||
|
assert top3[1].name == "c"
|
||||||
|
assert top3[2].name == "a"
|
||||||
|
|
||||||
|
def test_top_bottlenecks_zero_or_negative(self) -> None:
|
||||||
|
"""n <= 0 应返回空元组."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert profile.top_bottlenecks(0) == ()
|
||||||
|
assert profile.top_bottlenecks(-1) == ()
|
||||||
|
|
||||||
|
def test_critical_tasks(self) -> None:
|
||||||
|
"""critical_tasks 应返回关键路径上的任务(按路径顺序)."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
|
||||||
|
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
|
||||||
|
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
_spec("a"),
|
||||||
|
_spec("b", deps=("a",)),
|
||||||
|
_spec("c", deps=("a",)),
|
||||||
|
_spec("d", deps=("b", "c")),
|
||||||
|
])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# 关键路径 a -> b -> d
|
||||||
|
critical = profile.critical_tasks()
|
||||||
|
assert len(critical) == 3
|
||||||
|
assert [t.name for t in critical] == ["a", "b", "d"]
|
||||||
|
|
||||||
|
def test_failed_tasks(self) -> None:
|
||||||
|
"""failed_tasks 应返回 FAILED 状态的任务."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
|
||||||
|
report.results["b"] = _result("b", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
failed = profile.failed_tasks()
|
||||||
|
assert len(failed) == 1
|
||||||
|
assert failed[0].name == "a"
|
||||||
|
|
||||||
|
def test_skipped_tasks(self) -> None:
|
||||||
|
"""skipped_tasks 应返回 SKIPPED 状态的任务."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
report.results["b"] = _skipped_result("b")
|
||||||
|
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
skipped = profile.skipped_tasks()
|
||||||
|
assert len(skipped) == 1
|
||||||
|
assert skipped[0].name == "b"
|
||||||
|
|
||||||
|
|
||||||
|
class TestOutputFormats:
|
||||||
|
"""测试输出格式."""
|
||||||
|
|
||||||
|
def test_to_dict_structure(self) -> None:
|
||||||
|
"""to_dict 应返回包含所有字段的字典."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.5)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
d = profile.to_dict()
|
||||||
|
|
||||||
|
assert "tasks" in d
|
||||||
|
assert "total_duration_seconds" in d
|
||||||
|
assert "critical_path_duration_seconds" in d
|
||||||
|
assert "critical_path" in d
|
||||||
|
assert "avg_parallelism" in d
|
||||||
|
assert "peak_parallelism" in d
|
||||||
|
assert "parallelism_efficiency" in d
|
||||||
|
assert "bottlenecks" in d
|
||||||
|
assert len(d["tasks"]) == 1
|
||||||
|
assert d["tasks"][0]["name"] == "a"
|
||||||
|
assert d["tasks"][0]["status"] == "success"
|
||||||
|
assert d["tasks"][0]["duration_seconds"] == 1.5
|
||||||
|
assert d["tasks"][0]["is_on_critical_path"] is True
|
||||||
|
|
||||||
|
def test_describe_contains_key_sections(self) -> None:
|
||||||
|
"""describe 应包含关键章节标题."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
text = profile.describe()
|
||||||
|
|
||||||
|
assert "PyFlowX 性能剖面报告" in text
|
||||||
|
assert "【图级指标】" in text
|
||||||
|
assert "【关键路径】" in text
|
||||||
|
assert "【Top" in text
|
||||||
|
assert "【全部任务】" in text
|
||||||
|
assert "a" in text
|
||||||
|
|
||||||
|
def test_describe_empty_report(self) -> None:
|
||||||
|
"""空报告的 describe 应不崩溃且包含章节标题."""
|
||||||
|
report = px.RunReport()
|
||||||
|
graph = px.Graph()
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
text = profile.describe()
|
||||||
|
|
||||||
|
assert "【图级指标】" in text
|
||||||
|
assert "(无)" in text
|
||||||
|
|
||||||
|
def test_repr(self) -> None:
|
||||||
|
"""__repr__ 应包含关键指标."""
|
||||||
|
start = datetime(2024, 1, 1, 0, 0, 0)
|
||||||
|
report = px.RunReport()
|
||||||
|
report.results["a"] = _result("a", start, 1.0)
|
||||||
|
graph = px.Graph.from_specs([_spec("a")])
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
r = repr(profile)
|
||||||
|
|
||||||
|
assert "ProfileReport" in r
|
||||||
|
assert "tasks=1" in r
|
||||||
|
assert "total=1.000s" in r
|
||||||
|
|
||||||
|
def test_task_profile_to_dict(self) -> None:
|
||||||
|
"""TaskProfile.to_dict 应返回正确字段."""
|
||||||
|
tp = TaskProfile(
|
||||||
|
name="x",
|
||||||
|
status=TaskStatus.SUCCESS,
|
||||||
|
duration=1.5,
|
||||||
|
attempts=2,
|
||||||
|
wait_time=0.3,
|
||||||
|
is_on_critical_path=True,
|
||||||
|
deps=("a", "b"),
|
||||||
|
)
|
||||||
|
d = tp.to_dict()
|
||||||
|
|
||||||
|
assert d["name"] == "x"
|
||||||
|
assert d["status"] == "success"
|
||||||
|
assert d["duration_seconds"] == 1.5
|
||||||
|
assert d["attempts"] == 2
|
||||||
|
assert d["wait_time_seconds"] == 0.3
|
||||||
|
assert d["is_on_critical_path"] is True
|
||||||
|
assert d["deps"] == ["a", "b"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestIntegrationWithRun:
|
||||||
|
"""与真实 run() 集成测试."""
|
||||||
|
|
||||||
|
def test_profile_from_real_run(self) -> None:
|
||||||
|
"""从真实 run() 结果构建剖面."""
|
||||||
|
import time
|
||||||
|
|
||||||
|
def slow() -> int:
|
||||||
|
time.sleep(0.01) # 确保任务有实际耗时,避免 duration 极小导致并行度计算为 0
|
||||||
|
return 1
|
||||||
|
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
px.TaskSpec("a", slow),
|
||||||
|
px.TaskSpec("b", slow, depends_on=("a",)),
|
||||||
|
px.TaskSpec("c", slow, depends_on=("a",)),
|
||||||
|
])
|
||||||
|
report = px.run(graph, strategy="sequential")
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
assert len(profile.tasks) == 3
|
||||||
|
# sequential 策略下应为串行,duration > 0
|
||||||
|
assert profile.critical_path_duration > 0
|
||||||
|
# sequential 策略下并行度应为 1
|
||||||
|
assert profile.peak_parallelism == 1
|
||||||
|
|
||||||
|
def test_profile_from_thread_run(self) -> None:
|
||||||
|
"""从 thread 策略 run() 结果构建剖面,验证并行度 > 1."""
|
||||||
|
import time
|
||||||
|
|
||||||
|
def slow() -> int:
|
||||||
|
time.sleep(0.05)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
graph = px.Graph.from_specs([
|
||||||
|
px.TaskSpec("a", slow),
|
||||||
|
px.TaskSpec("b", slow),
|
||||||
|
px.TaskSpec("c", slow),
|
||||||
|
])
|
||||||
|
report = px.run(graph, strategy="thread", max_workers=3)
|
||||||
|
|
||||||
|
profile = ProfileReport.from_report(report, graph)
|
||||||
|
|
||||||
|
# 三个任务并行,峰值应 >= 2(可能因调度时机不到 3)
|
||||||
|
assert profile.peak_parallelism >= 2
|
||||||
|
assert profile.critical_path_duration > 0
|
||||||
+2
-2
@@ -193,8 +193,8 @@ def test_should_execute_skip_if_missing_cmd_not_found() -> None:
|
|||||||
|
|
||||||
def test_should_execute_skip_if_missing_cmd_found() -> None:
|
def test_should_execute_skip_if_missing_cmd_found() -> None:
|
||||||
"""skip_if_missing 但命令存在时应执行."""
|
"""skip_if_missing 但命令存在时应执行."""
|
||||||
# 使用 Python 作为已安装的命令
|
# 使用 Python 作为已安装的命令(Windows 上 echo 是 shell 内置,shutil.which 找不到)
|
||||||
spec = TaskSpec("a", cmd=["echo"], skip_if_missing=True) # echo 应存在
|
spec = TaskSpec("a", cmd=["python"], skip_if_missing=True) # python 应存在
|
||||||
should_run, reason = spec.should_execute({})
|
should_run, reason = spec.should_execute({})
|
||||||
assert should_run is True
|
assert should_run is True
|
||||||
assert reason is None
|
assert reason is None
|
||||||
|
|||||||
@@ -5603,7 +5603,7 @@ pycountry = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pyflowx"
|
name = "pyflowx"
|
||||||
version = "0.2.11"
|
version = "0.2.13"
|
||||||
source = { editable = "." }
|
source = { editable = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" },
|
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" },
|
||||||
|
|||||||
Reference in New Issue
Block a user