9 Commits

Author SHA1 Message Date
zhou 0205baeab6 Merge remote-tracking branch 'origin/main' into develop
CI / Lint, Typecheck & Test (push) Failing after 32s
2026-07-02 13:37:35 +08:00
zhou bbcf80e0ca ci: 合并CI工作流并更新发布流程
CI / Lint, Typecheck & Test (push) Has been cancelled
将原有的lint、typecheck和测试工作流合并为单个CI任务,重构release工作流以支持Gitea发布,简化pypi发布步骤
2026-07-02 13:37:10 +08:00
zhou 5293831165 Merge pull request 'feat(cli/dev/envdev): 为Linux环境添加Docker安装配置相关任务' (#1) from develop into main
CI / Lint & Typecheck (push) Failing after 30s
CI / Test (ubuntu-latest) (push) Failing after 30s
CI / Test (macos-latest) (push) Has been cancelled
CI / Test (windows-latest) (push) Has been cancelled
Reviewed-on: #1
2026-07-02 05:26:08 +00:00
zhou 87606d152a feat(cli/dev/envdev): 为Linux环境添加Docker安装配置相关任务
CI / Lint & Typecheck (push) Failing after 6m4s
CI / Test (ubuntu-latest) (push) Failing after 1m31s
CI / Test (macos-latest) (push) Has been cancelled
CI / Test (windows-latest) (push) Has been cancelled
新增Linux系统下安装docker-compose-v2、添加用户到docker组以及刷新docker用户组的任务流程,完善开发环境配置步骤
2026-07-02 10:58:12 +08:00
zhou 6f93e6eb6d bump version to 0.3.0
Release / build (push) Failing after 31s
Release / release (push) Has been skipped
Release / publish-pypi (push) Has been skipped
CI / Test (macos-latest) (push) Has been cancelled
CI / Test (ubuntu-latest) (push) Has been cancelled
CI / Test (windows-latest) (push) Has been cancelled
CI / Lint & Typecheck (push) Has been cancelled
2026-06-28 21:38:37 +08:00
zhou 43e1aad1fe chore: 发布版本0.2.13并完善任务执行环境配置
本次提交更新了版本号至0.2.13,同时完成多项改进:
1.  在.gitignore中新增忽略性能分析文件*_profile.html
2.  修复测试用例中echo命令在Windows下无法被正确检测的问题,改用python命令
3.  优化测试用例确保性能统计数据有效,添加耗时模拟函数
4.  为所有CLI任务统一配置项目根目录作为工作目录,解决跨平台执行路径问题
5.  新增测试验证所有任务的cwd配置正确性
2026-06-28 21:38:18 +08:00
zhou 467634f8c7 bump version to 0.2.13
Release / build (push) Failing after 11m59s
Release / release (push) Has been skipped
Release / publish-pypi (push) Has been skipped
2026-06-28 20:30:54 +08:00
zhou ce31f60441 feat(cli): add pxp performance profiler command
1. 新增pxp CLI工具用于分析PyFlowX脚本生成性能报告
2. 新增ProfileReport.to_html方法生成自包含HTML报告
3. 新增完整的profiler功能测试用例
4. 更新pyproject.toml添加pxp入口点
5. 版本升级至0.2.12
2026-06-28 20:30:17 +08:00
zhou 3d6d769685 feat(profiling): 添加工作流性能分析模块与测试用例
新增了性能剖面分析能力,支持从运行报告生成任务级、图级性能指标,包括关键路径、并行度分析和瓶颈识别,同时补充了完整的单元测试覆盖。
2026-06-28 19:59:25 +08:00
14 changed files with 2198 additions and 78 deletions
+5 -24
View File
@@ -9,34 +9,12 @@ concurrency:
cancel-in-progress: true cancel-in-progress: true
jobs: jobs:
lint-and-typecheck: ci:
name: Lint & Typecheck name: Lint, Typecheck & Test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- uses: actions/setup-python@v5
with:
python-version: '3.13'
- run: uv sync
- run: uv run ruff check src tests
- run: uv run pyrefly check .
test:
name: Test (${{ matrix.os }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5 - uses: astral-sh/setup-uv@v5
with: with:
enable-cache: true enable-cache: true
@@ -47,4 +25,7 @@ jobs:
3.8 3.8
3.13 3.13
- run: uv sync
- run: uv run ruff check src tests
- run: uv run pyrefly check .
- run: uvx tox run -e py38,py313 - run: uvx tox run -e py38,py313
+20 -34
View File
@@ -6,13 +6,11 @@ on:
permissions: permissions:
contents: write contents: write
id-token: write
jobs: jobs:
build: release:
name: Build, Publish & Release
runs-on: ubuntu-latest runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@@ -26,36 +24,24 @@ jobs:
- run: uv build - run: uv build
- id: version
run: echo "version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
- uses: actions/upload-artifact@v7
with:
name: dist
path: dist/
publish-pypi:
needs: build
runs-on: ubuntu-latest
environment: pypi
steps:
- uses: actions/download-artifact@v8
with:
name: dist
path: dist
- uses: pypa/gh-action-pypi-publish@release/v1 - uses: pypa/gh-action-pypi-publish@release/v1
release:
needs: [build, publish-pypi]
runs-on: ubuntu-latest
steps:
- uses: actions/download-artifact@v8
with: with:
name: dist password: ${{ secrets.PYPI_API_TOKEN }}
path: dist
- uses: softprops/action-gh-release@v2 - name: Create Gitea Release
with: run: |
files: dist/* VERSION=${GITHUB_REF#refs/tags/v}
generate_release_notes: true REPO=${GITHUB_REPOSITORY}
UPLOAD_URL=$(curl -s -X POST "https://git.gookeryoung.cn/api/v1/repos/${REPO}/releases" \
-H "Authorization: token ${{ secrets.GITEA_TOKEN }}" \
-H "Content-Type: application/json" \
-d "{\"tag_name\": \"v${VERSION}\", \"name\": \"v${VERSION}\", \"draft\": false, \"prerelease\": false}" \
| jq -r '.upload_url')
for file in dist/*; do
curl -s -X POST "${UPLOAD_URL}?name=$(basename $file)" \
-H "Authorization: token ${{ secrets.GITEA_TOKEN }}" \
-H "Content-Type: application/octet-stream" \
--data-binary @"$file"
done
env:
GITEA_URL: ${{ secrets.GITEA_URL || 'https://git.gookeryoung.cn' }}
+1
View File
@@ -10,3 +10,4 @@ wheels/
.venv .venv
.coverage .coverage
.idea .idea
*_profile.html
+2 -1
View File
@@ -21,7 +21,7 @@ license = { text = "MIT" }
name = "pyflowx" name = "pyflowx"
readme = "README.md" readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.8"
version = "0.2.12" version = "0.3.0"
[project.scripts] [project.scripts]
autofmt = "pyflowx.cli.autofmt:main" autofmt = "pyflowx.cli.autofmt:main"
@@ -38,6 +38,7 @@ packtool = "pyflowx.cli.packtool:main"
pdftool = "pyflowx.cli.pdftool:main" pdftool = "pyflowx.cli.pdftool:main"
piptool = "pyflowx.cli.piptool:main" piptool = "pyflowx.cli.piptool:main"
pymake = "pyflowx.cli.pymake:main" pymake = "pyflowx.cli.pymake:main"
pxp = "pyflowx.cli.profiler:main"
reseticon = "pyflowx.cli.reseticoncache:main" reseticon = "pyflowx.cli.reseticoncache:main"
scrcap = "pyflowx.cli.screenshot:main" scrcap = "pyflowx.cli.screenshot:main"
sglang = "pyflowx.cli.llm.sglang:main" sglang = "pyflowx.cli.llm.sglang:main"
+4 -1
View File
@@ -82,6 +82,7 @@ from .errors import (
) )
from .executors import Strategy, run from .executors import Strategy, run
from .graph import Graph, GraphDefaults from .graph import Graph, GraphDefaults
from .profiling import ProfileReport, TaskProfile
from .report import RunReport from .report import RunReport
from .runner import CliExitCode, CliRunner from .runner import CliExitCode, CliRunner
from .storage import JSONBackend, MemoryBackend, StateBackend from .storage import JSONBackend, MemoryBackend, StateBackend
@@ -99,7 +100,7 @@ from .task import (
task_template, task_template,
) )
__version__ = "0.3.6" __version__ = "0.4.0"
__all__ = [ __all__ = [
"IS_LINUX", "IS_LINUX",
@@ -122,6 +123,7 @@ __all__ = [
"JSONBackend", "JSONBackend",
"MemoryBackend", "MemoryBackend",
"MissingDependencyError", "MissingDependencyError",
"ProfileReport",
"PyFlowXError", "PyFlowXError",
"RetryPolicy", "RetryPolicy",
"RunReport", "RunReport",
@@ -132,6 +134,7 @@ __all__ = [
"TaskEvent", "TaskEvent",
"TaskFailedError", "TaskFailedError",
"TaskHooks", "TaskHooks",
"TaskProfile",
"TaskResult", "TaskResult",
"TaskSpec", "TaskSpec",
"TaskStatus", "TaskStatus",
+26
View File
@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import getpass
from pathlib import Path from pathlib import Path
from typing import Literal, get_args from typing import Literal, get_args
@@ -254,6 +255,31 @@ def main() -> None:
allow_upstream_skip=True, allow_upstream_skip=True,
verbose=True, verbose=True,
), ),
# 安装 Docker
px.TaskSpec(
"install_docker",
cmd=["sudo", "apt", "install", "-y", "docker-compose-v2"],
conditions=(BuiltinConditions.IS_LINUX(),),
depends_on=("install_mirror",),
allow_upstream_skip=True,
verbose=True,
),
px.TaskSpec(
"add_docker_group",
cmd=["sudo", "usermod", "-aG", "docker", getpass.getuser()],
conditions=(BuiltinConditions.IS_LINUX(),),
depends_on=("install_docker",),
allow_upstream_skip=True,
verbose=True,
),
px.TaskSpec(
"refresh_docker_group",
cmd=["newgrp", "docker"],
conditions=(BuiltinConditions.IS_LINUX(),),
depends_on=("add_docker_group",),
allow_upstream_skip=True,
verbose=True,
),
# 设置 Python 环境变量 # 设置 Python 环境变量
*setenv_group({ *setenv_group({
"PIP_INDEX_URL": PIP_INDEX_URLS[python_mirror], "PIP_INDEX_URL": PIP_INDEX_URLS[python_mirror],
+272
View File
@@ -0,0 +1,272 @@
"""pxp —— PyFlowX 性能分析器.
分析包含 ``px`` 调用的 Python 脚本,生成工作流执行性能剖面报告。
工作原理
--------
1. 注入 hookmonkey-patch ``pyflowx.run`` / ``pyflowx.executors.run`` /
``pyflowx.runner.run``,捕获最后一次执行的 ``Graph`` 与 ``RunReport``。
2. 执行目标脚本:用 ``runpy.run_path`` 以 ``__main__`` 身份执行,
捕获 ``SystemExit``(脚本可能调 ``sys.exit``)。
3. 生成报告:从捕获的 report + graph 构建 :class:`ProfileReport`
默认输出 HTML 并自动打开浏览器。
使用方式
--------
# 分析 pymake.py,生成 HTML 报告并打开浏览器
pxp pymake.py
# 传递参数给被分析脚本(用 -- 分隔)
pxp pymake.py -- t
# 指定输出文件
pxp pymake.py -o report.html
# 不打开浏览器
pxp pymake.py --no-browser
# 输出纯文本报告
pxp pymake.py -E text
"""
from __future__ import annotations
__all__ = ["main"]
import argparse
import runpy
import sys
import webbrowser
from pathlib import Path
from typing import Any
from .. import executors as _executors
from .. import runner as _runner
from ..profiling import ProfileReport
from ..report import RunReport
def _build_parser() -> argparse.ArgumentParser:
"""构建参数解析器。"""
parser = argparse.ArgumentParser(
prog="pxp",
description="PyFlowX 性能分析器:分析包含 px 调用的脚本,生成性能剖面报告。",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"示例:\n"
" pxp pymake.py # 分析并打开 HTML 报告\n"
" pxp pymake.py -- t # 传递参数 t 给脚本\n"
" pxp pymake.py -E text # 输出纯文本报告\n"
" pxp pymake.py -o out.html # 指定输出文件\n"
),
)
_ = parser.add_argument(
"--export",
"-E",
choices=["html", "text"],
default="html",
help="导出格式(默认: html",
)
_ = parser.add_argument(
"--no-browser",
action="store_true",
help="不自动打开浏览器(仅 HTML 格式有效)",
)
_ = parser.add_argument(
"-o",
"--output",
help="输出文件路径(默认: <script>_profile.html",
)
return parser
def _capture_px_run() -> dict[str, Any]:
"""注入 hook 捕获 px.run() 调用。
返回一个字典,``run()`` 执行后填充 ``graph`` 与 ``report``。
同时返回还原函数用于 finally 块。
Note
-----
需同时 patch 三处引用:
* ``pyflowx.executors.run`` —— 实际实现
* ``pyflowx.runner.run`` —— ``CliRunner`` 直接 import 的引用
* ``pyflowx.run`` —— 顶层包导出的引用(用户脚本常用 ``px.run()``
另外 patch ``RunReport.__init__`` 以捕获 ``run()`` 内部创建的 report 实例。
这对于 ``run()`` 抛出 ``TaskFailedError`` 的场景至关重要:此时 ``run()``
不会正常返回 report,但 report 对象已在内部创建并填充了已执行任务的结果。
通过 ``capture_enabled`` 标志确保只在 ``patched_run`` 调用期间捕获。
"""
captured: dict[str, Any] = {}
original_exec_run = _executors.run
original_runner_run = _runner.run
# 惰性获取顶层 pyflowx.run 引用(避免循环导入)
import pyflowx as px_mod
original_px_run = px_mod.run
original_report_init = RunReport.__init__
capture_enabled = [False]
def patched_report_init(self: RunReport, *args: Any, **kwargs: Any) -> None:
original_report_init(self, *args, **kwargs)
if capture_enabled[0]:
captured["report"] = self
RunReport.__init__ = patched_report_init # type: ignore[assignment]
def patched_run(graph: Any, *args: Any, **kwargs: Any) -> RunReport:
captured["graph"] = graph
capture_enabled[0] = True
try:
report = original_exec_run(graph, *args, **kwargs)
# 正常返回时确保 captured["report"] 是返回的 report
captured["report"] = report
return report
finally:
capture_enabled[0] = False
# patch 所有引用 run 的入口
_executors.run = patched_run # type: ignore[assignment]
_runner.run = patched_run # type: ignore[assignment]
px_mod.run = patched_run # type: ignore[assignment]
def _restore() -> None:
_executors.run = original_exec_run # type: ignore[assignment]
_runner.run = original_runner_run # type: ignore[assignment]
px_mod.run = original_px_run # type: ignore[assignment]
RunReport.__init__ = original_report_init # type: ignore[assignment]
captured["_restore"] = _restore
return captured
def _run_target_script(script: Path, script_args: list[str]) -> dict[str, Any]:
"""执行目标脚本。
将脚本所在目录加入 ``sys.path``,设置 ``sys.argv``,然后用
``runpy.run_path`` 以 ``__main__`` 身份执行。捕获 ``SystemExit``。
Returns
-------
dict[str, Any]
脚本模块的全局变量字典(含 ``main`` 等定义)。
"""
sys.argv = [str(script), *script_args]
script_dir = str(script.parent.resolve())
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
return runpy.run_path(str(script), run_name="__main__")
def _try_call_main(module_globals: dict[str, Any]) -> None:
"""若模块定义了 ``main`` 可调用对象,调用它。
用于脚本无 ``if __name__ == "__main__"`` 块的场景(如通过 entry points
注册的 CLI 工具脚本)。``main`` 通常调用 ``CliRunner.run_cli()``
后者读取 ``sys.argv[1:]`` 执行对应命令。
"""
main_fn = module_globals.get("main")
if callable(main_fn):
main_fn()
def _output_report(
profile: ProfileReport,
export: str,
output: str | None,
script_stem: str,
no_browser: bool,
) -> None:
"""输出性能报告。"""
if export == "text":
print(profile.describe())
return
# HTML 格式
html = profile.to_html()
if output:
out_path = Path(output)
else:
out_path = Path.cwd() / f"{script_stem}_profile.html"
out_path.write_text(html, encoding="utf-8")
print(f"HTML 报告已生成: {out_path}")
if not no_browser:
try:
webbrowser.open(f"file://{out_path.resolve()}")
except Exception as e:
print(f"警告:无法打开浏览器: {e}", file=sys.stderr)
def main() -> None:
"""pxp CLI 入口。"""
parser = _build_parser()
pxp_args, remaining = parser.parse_known_args()
if not remaining:
parser.print_help()
sys.exit(2)
script_str = remaining[0]
script_args = remaining[1:]
script_path = Path(script_str).resolve()
if not script_path.is_file():
print(f"错误:脚本不存在: {script_path}", file=sys.stderr)
sys.exit(2)
# 注入 hook
captured = _capture_px_run()
# 执行目标脚本
print(f"正在分析: {script_path}")
if script_args:
print(f"脚本参数: {script_args}")
print("-" * 60)
module_globals: dict[str, Any] = {}
try:
module_globals = _run_target_script(script_path, script_args)
except SystemExit:
# 脚本调用了 sys.exit,正常情况
pass
except Exception as e:
print(f"警告:脚本执行抛出异常: {e}", file=sys.stderr)
# 若脚本执行未捕获到 run(),尝试调用模块的 main() 函数
# (适用于无 ``if __name__ == "__main__"`` 块的 CLI 脚本)
if captured.get("report") is None and module_globals:
try:
_try_call_main(module_globals)
except SystemExit:
pass
except Exception as e:
print(f"警告:调用 main() 抛出异常: {e}", file=sys.stderr)
# 还原 hook
restore = captured.pop("_restore", None)
if restore is not None:
restore()
# 检查是否捕获到 run() 调用
report = captured.get("report")
graph = captured.get("graph")
if report is None or graph is None:
print("错误:未捕获到 px.run() 调用,无法生成性能报告", file=sys.stderr)
print("请确保脚本通过 px.run() 或 CliRunner 执行任务流图。", file=sys.stderr)
sys.exit(1)
# 生成报告
profile = ProfileReport.from_report(report, graph)
_output_report(
profile,
export=pxp_args.export,
output=pxp_args.output,
script_stem=script_path.stem,
no_browser=pxp_args.no_browser,
)
if __name__ == "__main__":
main()
+24 -15
View File
@@ -6,39 +6,48 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import pyflowx as px import pyflowx as px
from pyflowx.conditions import Constants from pyflowx.conditions import Constants
# 项目根目录(pymake.py 在 src/pyflowx/cli,向上四层到达根目录)
ROOT_DIR = Path(__file__).parent.parent.parent.parent
MATURIN_BUILD_COMMAND = ["maturin", "build", "-r"] MATURIN_BUILD_COMMAND = ["maturin", "build", "-r"]
if Constants.IS_WINDOWS: if Constants.IS_WINDOWS:
MATURIN_BUILD_COMMAND.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"]) MATURIN_BUILD_COMMAND.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"])
# 扁平注册所有任务(px.cmd 自动从命令前两段推导 name) # 扁平注册所有任务(px.cmd 自动从命令前两段推导 name)
# 所有任务指定 cwd=ROOT_DIR,确保在项目根目录执行
tasks: list[px.TaskSpec] = [ tasks: list[px.TaskSpec] = [
px.cmd(["uv", "build"]), px.cmd(["uv", "build"], cwd=ROOT_DIR),
px.cmd(MATURIN_BUILD_COMMAND), px.cmd(MATURIN_BUILD_COMMAND, cwd=ROOT_DIR),
px.cmd(["uv", "sync"]), px.cmd(["uv", "sync"], cwd=ROOT_DIR),
px.cmd(["gitt", "c"], name="git_clean"), px.cmd(["gitt", "c"], name="git_clean", cwd=ROOT_DIR),
px.cmd( px.cmd(
["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"], ["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"],
name="test", name="test",
cwd=ROOT_DIR,
), ),
px.cmd( px.cmd(
["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"], ["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"],
name="test_fast", name="test_fast",
cwd=ROOT_DIR,
), ),
px.cmd( px.cmd(
["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"], ["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
name="test_coverage", name="test_coverage",
cwd=ROOT_DIR,
), ),
px.cmd(["pyrefly", "check", "."]), px.cmd(["pyrefly", "check", "."], cwd=ROOT_DIR),
px.cmd(["git", "add", "-A"], name="git_add_all"), px.cmd(["git", "add", "-A"], name="git_add_all", cwd=ROOT_DIR),
px.cmd(["bumpversion"]), px.cmd(["bumpversion"], cwd=ROOT_DIR),
px.cmd(["bumpversion", "minor"]), px.cmd(["bumpversion", "minor"], cwd=ROOT_DIR),
px.cmd(["git", "push"]), px.cmd(["git", "push"], cwd=ROOT_DIR),
px.cmd(["git", "push", "--tags"], name="git_push_tags"), px.cmd(["git", "push", "--tags"], name="git_push_tags", cwd=ROOT_DIR),
px.cmd(["hatch", "publish"], name="publish_python"), px.cmd(["hatch", "publish"], name="publish_python", cwd=ROOT_DIR),
px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish"), px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish", cwd=ROOT_DIR),
] ]
# 单任务别名(alias 名与任务名相同):直接内联 TaskSpec,避免 str 自引用 # 单任务别名(alias 名与任务名相同):直接内联 TaskSpec,避免 str 自引用
@@ -55,13 +64,13 @@ aliases: dict[str, str | list[str | px.TaskSpec] | px.TaskSpec | px.Graph] = {
"bump": ["c", "tc", "git_add_all", "bumpversion"], "bump": ["c", "tc", "git_add_all", "bumpversion"],
"bumpmi": "bumpversion_minor", "bumpmi": "bumpversion_minor",
"cov": ["git_clean", "test_coverage"], "cov": ["git_clean", "test_coverage"],
"doc": px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc"), "doc": px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc", cwd=ROOT_DIR),
"lint": px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint"), "lint": px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint", cwd=ROOT_DIR),
"pb": ["twine_publish", "publish_python"], "pb": ["twine_publish", "publish_python"],
"t": "test", "t": "test",
"tf": "test_fast", "tf": "test_fast",
"tc": ["pyrefly_check", "lint"], "tc": ["pyrefly_check", "lint"],
"tox": px.cmd(["tox", "-p", "auto"], name="tox"), "tox": px.cmd(["tox", "-p", "auto"], name="tox", cwd=ROOT_DIR),
# 发布命令 # 发布命令
"p": ["git_clean", "git_push", "git_push_tags"], "p": ["git_clean", "git_push", "git_push_tags"],
} }
+705
View File
@@ -0,0 +1,705 @@
"""工作流执行性能评估。
基于 :class:`~pyflowx.report.RunReport` 中已有的 ``started_at`` /
``finished_at`` 时间戳进行离线分析,**零运行时开销**——不修改执行流程,
不注册回调,不引入额外计时器。
核心指标
--------
* **任务级**:每个任务的 wall-clock 耗时、状态、重试次数、等待时间
(从最早依赖完成到本任务开始)。
* **图级**:总耗时(wall-clock)、关键路径耗时(理论最短耗时)、
并行度效率(关键路径耗时 / 总耗时)。
* **关键路径**:从源点到汇点的最长依赖路径,识别真正的串行瓶颈。
* **并行度**:基于时间线重叠计算瞬时并行度,给出平均并行度与峰值并行度。
* **瓶颈识别**:按耗时排序的 Top-N 任务。
设计原则
--------
* 数据来源于 ``RunReport`` + ``Graph``,无副作用。
* 计算复杂度 O(V+E):拓扑排序 + 单次松弛,适合大规模图。
* 所有时间戳用 ``datetime``,与 :class:`TaskResult` 保持一致。
快速上手
--------
import pyflowx as px
report = px.run(graph)
profile = px.ProfileReport.from_report(report, graph)
print(profile.describe())
bottlenecks = profile.top_bottlenecks(3)
"""
from __future__ import annotations
__all__ = [
"ProfileReport",
"TaskProfile",
]
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from .graph import Graph
from .report import RunReport
from .task import TaskResult, TaskStatus
@dataclass(frozen=True)
class TaskProfile:
"""单个任务的性能剖面。
属性
----
name:
任务名。
status:
终态(SUCCESS/FAILED/SKIPPED)。
duration:
wall-clock 执行耗时(秒)。SKIPPED 任务为 0.0。
attempts:
尝试次数(含首次)。
wait_time:
从最早硬依赖完成到本任务开始的等待时间(秒)。
无硬依赖或 SKIPPED 时为 0.0。
is_on_critical_path:
是否位于关键路径上。
deps:
硬依赖任务名列表。
"""
name: str
status: TaskStatus
duration: float
attempts: int
wait_time: float
is_on_critical_path: bool
deps: tuple[str, ...]
def to_dict(self) -> dict[str, Any]:
"""转为 JSON 友好的字典。"""
return {
"name": self.name,
"status": self.status.value,
"duration_seconds": round(self.duration, 6),
"attempts": self.attempts,
"wait_time_seconds": round(self.wait_time, 6),
"is_on_critical_path": self.is_on_critical_path,
"deps": list(self.deps),
}
@dataclass(frozen=True)
class ProfileReport:
"""工作流执行的性能剖面报告。
通过 :meth:`from_report` 从 :class:`RunReport` + :class:`Graph` 构建。
所有字段在构造时一次性计算完毕,后续访问为 O(1)。
"""
tasks: tuple[TaskProfile, ...]
"""所有任务的性能剖面(按拓扑序)。"""
total_duration: float
"""整次运行的 wall-clock 耗时(秒)。"""
critical_path_duration: float
"""关键路径耗时(秒):从最早任务开始到最晚任务结束的最长依赖路径。"""
critical_path: tuple[str, ...]
"""关键路径上的任务名序列(按执行顺序)。"""
avg_parallelism: float
"""平均并行度 = 任务总耗时 / wall-clock 总耗时。"""
peak_parallelism: int
"""峰值并行度:任一时刻同时运行的任务数最大值。"""
parallelism_efficiency: float
"""并行度效率 = 关键路径耗时 / wall-clock 总耗时。``1.0`` 表示完全串行,
越大表示并行化收益越低(瓶颈在关键路径上)。"""
# ------------------------------------------------------------------ #
# 构建
# ------------------------------------------------------------------ #
@classmethod
def from_report(cls, report: RunReport, graph: Graph) -> ProfileReport:
"""从运行报告与图构建性能剖面。
参数
----
report:
已完成的 :class:`RunReport`,需包含 ``started_at``/``finished_at``。
graph:
对应的 :class:`Graph`,用于依赖关系与关键路径分析。
Note
-----
本方法不修改 ``report`` 或 ``graph``,纯函数式计算。
"""
task_profiles = cls._build_task_profiles(report, graph)
total_duration = cls._calc_total_duration(report)
critical_path, critical_duration = cls._calc_critical_path(graph, report)
avg_par, peak_par = cls._calc_parallelism(report)
efficiency = critical_duration / total_duration if total_duration > 0 else 0.0
# 标记关键路径上的任务
critical_set = set(critical_path)
marked = tuple(
TaskProfile(
name=t.name,
status=t.status,
duration=t.duration,
attempts=t.attempts,
wait_time=t.wait_time,
is_on_critical_path=t.name in critical_set,
deps=t.deps,
)
for t in task_profiles
)
return cls(
tasks=marked,
total_duration=total_duration,
critical_path_duration=critical_duration,
critical_path=critical_path,
avg_parallelism=avg_par,
peak_parallelism=peak_par,
parallelism_efficiency=efficiency,
)
@staticmethod
def _build_task_profiles(report: RunReport, graph: Graph) -> tuple[TaskProfile, ...]:
"""构建每个任务的性能剖面。"""
profiles: list[TaskProfile] = []
for name, result in report.results.items():
spec = graph.specs.get(name)
deps = tuple(spec.depends_on) if spec is not None else ()
duration = result.duration or 0.0
wait_time = ProfileReport._calc_wait_time(result, deps, report)
profiles.append(
TaskProfile(
name=name,
status=result.status,
duration=duration,
attempts=result.attempts,
wait_time=wait_time,
is_on_critical_path=False, # 后续标记
deps=deps,
)
)
return tuple(profiles)
@staticmethod
def _calc_wait_time(
result: TaskResult[Any],
deps: tuple[str, ...],
report: RunReport,
) -> float:
"""计算等待时间:从最早依赖完成到本任务开始。
无硬依赖、SKIPPED 任务或时间戳缺失时返回 0.0。
"""
if not deps or result.started_at is None or result.status == TaskStatus.SKIPPED:
return 0.0
# 找出所有已完成依赖的最晚完成时间
dep_end_times: list[datetime] = []
for dep in deps:
dep_result = report.results.get(dep)
if dep_result is not None and dep_result.finished_at is not None:
dep_end_times.append(dep_result.finished_at)
if not dep_end_times:
return 0.0
latest_dep_end = max(dep_end_times)
delta = (result.started_at - latest_dep_end).total_seconds()
return max(0.0, delta)
@staticmethod
def _calc_total_duration(report: RunReport) -> float:
"""计算 wall-clock 总耗时:最早开始到最晚结束。"""
starts: list[datetime] = []
ends: list[datetime] = []
for r in report.results.values():
if r.started_at is not None:
starts.append(r.started_at)
if r.finished_at is not None:
ends.append(r.finished_at)
if not starts or not ends:
return 0.0
return (max(ends) - min(starts)).total_seconds()
@staticmethod
def _calc_critical_path(graph: Graph, report: RunReport) -> tuple[tuple[str, ...], float]:
"""计算关键路径:DAG 最长路径(按实际执行耗时)。
使用拓扑排序 + 动态规划,O(V+E)。SKIPPED 任务耗时按 0 计。
"""
# 构建耗时映射
durations: dict[str, float] = {}
for name, result in report.results.items():
durations[name] = result.duration or 0.0
# 拓扑序(使用 graph.layers 保证与分层一致)
try:
layers = graph.layers()
except Exception:
# 图校验失败时回退为空
return (), 0.0
# earliest_finish[name] = duration[name] + max(earliest_finish[dep] for dep in deps)
earliest_finish: dict[str, float] = {}
predecessor: dict[str, str | None] = {}
for layer in layers:
for name in layer:
spec = graph.specs.get(name)
deps = spec.depends_on if spec is not None else ()
if not deps:
earliest_finish[name] = durations.get(name, 0.0)
predecessor[name] = None
else:
best_dep: str | None = None
best_ef = 0.0
for dep in deps:
ef = earliest_finish.get(dep, 0.0)
if ef >= best_ef:
best_ef = ef
best_dep = dep
earliest_finish[name] = best_ef + durations.get(name, 0.0)
predecessor[name] = best_dep
if not earliest_finish:
return (), 0.0
# 找到 earliest_finish 最大的节点作为终点
end_node = max(earliest_finish, key=lambda n: earliest_finish[n])
total = earliest_finish[end_node]
# 回溯关键路径
path: list[str] = []
node: str | None = end_node
while node is not None:
path.append(node)
node = predecessor.get(node)
path.reverse()
return tuple(path), total
@staticmethod
def _calc_parallelism(report: RunReport) -> tuple[float, int]:
"""计算平均并行度与峰值并行度。
基于时间线扫描:将每个任务的 [started_at, finished_at] 区间
转为事件点(+1/-1),排序后扫描得到瞬时并行度序列。
返回 (avg_parallelism, peak_parallelism)。
无有效时间戳时返回 (0.0, 0)。
"""
events: list[tuple[float, int]] = [] # (timestamp, delta)
for r in report.results.values():
if r.started_at is None or r.finished_at is None:
continue
if r.status == TaskStatus.SKIPPED:
continue
start_ts = r.started_at.timestamp()
end_ts = r.finished_at.timestamp()
if end_ts <= start_ts:
continue
events.append((start_ts, 1))
events.append((end_ts, -1))
if not events:
return 0.0, 0
# 排序:同一时间点先处理结束(-1)再处理开始(+1),避免虚假峰值
events.sort(key=lambda e: (e[0], e[1]))
current = 0
peak = 0
# 加权面积用于计算平均并行度
area = 0.0
prev_ts = events[0][0]
for ts, delta in events:
if ts > prev_ts:
area += current * (ts - prev_ts)
current += delta
peak = max(peak, current)
prev_ts = ts
total_span = events[-1][0] - events[0][0]
avg = area / total_span if total_span > 0 else 0.0
return avg, peak
# ------------------------------------------------------------------ #
# 查询
# ------------------------------------------------------------------ #
def task(self, name: str) -> TaskProfile:
"""返回指定任务的剖面。不存在则 ``KeyError``。"""
for t in self.tasks:
if t.name == name:
return t
raise KeyError(name)
def top_bottlenecks(self, n: int = 5) -> tuple[TaskProfile, ...]:
"""返回耗时最长的 Top-N 任务(按 duration 降序)。
参数
----
n:
返回数量。``n <= 0`` 返回空元组。
"""
if n <= 0:
return ()
return tuple(sorted(self.tasks, key=lambda t: t.duration, reverse=True)[:n])
def critical_tasks(self) -> tuple[TaskProfile, ...]:
"""返回关键路径上的所有任务(按路径顺序)。"""
critical_set = set(self.critical_path)
# 保持关键路径顺序
order = {name: i for i, name in enumerate(self.critical_path)}
return tuple(sorted((t for t in self.tasks if t.name in critical_set), key=lambda t: order[t.name]))
def failed_tasks(self) -> tuple[TaskProfile, ...]:
"""返回 FAILED 状态的任务。"""
return tuple(t for t in self.tasks if t.status == TaskStatus.FAILED)
def skipped_tasks(self) -> tuple[TaskProfile, ...]:
"""返回 SKIPPED 状态的任务。"""
return tuple(t for t in self.tasks if t.status == TaskStatus.SKIPPED)
# ------------------------------------------------------------------ #
# 输出
# ------------------------------------------------------------------ #
def to_dict(self) -> dict[str, Any]:
"""转为 JSON 友好的字典。"""
return {
"tasks": [t.to_dict() for t in self.tasks],
"total_duration_seconds": round(self.total_duration, 6),
"critical_path_duration_seconds": round(self.critical_path_duration, 6),
"critical_path": list(self.critical_path),
"avg_parallelism": round(self.avg_parallelism, 4),
"peak_parallelism": self.peak_parallelism,
"parallelism_efficiency": round(self.parallelism_efficiency, 4),
"bottlenecks": [t.to_dict() for t in self.top_bottlenecks(5)],
}
def to_html(self) -> str:
"""生成自包含的 HTML 报告(含 CSS,无外部依赖)。
报告含:图级指标卡片、关键路径、时间线甘特图、Top 瓶颈表格、
全部任务表格。适合直接用浏览器打开查看。
"""
return _render_html(self)
def describe(self) -> str:
lines: list[str] = []
lines.append("=" * 70)
lines.append("PyFlowX 性能剖面报告")
lines.append("=" * 70)
lines.append("")
lines.append("【图级指标】")
lines.append(f" 总耗时 (wall-clock): {self.total_duration:.3f}s")
lines.append(f" 关键路径耗时: {self.critical_path_duration:.3f}s")
lines.append(f" 平均并行度: {self.avg_parallelism:.2f}")
lines.append(f" 峰值并行度: {self.peak_parallelism}")
lines.append(f" 并行度效率: {self.parallelism_efficiency:.2%}")
lines.append(f" 任务总数: {len(self.tasks)}")
lines.append("")
# 关键路径
lines.append("【关键路径】")
if self.critical_path:
lines.append(f" {' -> '.join(self.critical_path)}")
else:
lines.append(" (无)")
lines.append("")
# Top 瓶颈
bottlenecks = self.top_bottlenecks(5)
lines.append(f"【Top {len(bottlenecks)} 瓶颈任务】")
if bottlenecks:
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
for t in bottlenecks:
critical_flag = "" if t.is_on_critical_path else ""
lines.append(
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
f"{critical_flag:>8} {t.status.value:>8}",
)
else:
lines.append(" (无)")
lines.append("")
# 全部任务详情
lines.append("【全部任务】")
if self.tasks:
lines.append(f" {'任务':<30} {'耗时':>10} {'等待':>10} {'尝试':>6} {'关键路径':>8} {'状态':>8}")
lines.append(f" {'-' * 30} {'-' * 10} {'-' * 10} {'-' * 6} {'-' * 8} {'-' * 8}")
for t in self.tasks:
critical_flag = "" if t.is_on_critical_path else ""
lines.append(
f" {t.name:<30} {t.duration:>9.3f}s {t.wait_time:>9.3f}s {t.attempts:>6} "
f"{critical_flag:>8} {t.status.value:>8}",
)
else:
lines.append(" (无)")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def __repr__(self) -> str:
return (
f"ProfileReport(tasks={len(self.tasks)}, "
f"total={self.total_duration:.3f}s, "
f"critical={self.critical_path_duration:.3f}s, "
f"avg_par={self.avg_parallelism:.2f}, "
f"peak_par={self.peak_parallelism})"
)
# ---------------------------------------------------------------------- #
# HTML 渲染(私有,零依赖)
# ---------------------------------------------------------------------- #
_HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>PyFlowX 性能剖面报告</title>
<style>
:root {{
--bg: #f5f5f7;
--card: #ffffff;
--border: #d2d2d7;
--text: #1d1d1f;
--muted: #6e6e73;
--accent: #0071e3;
--success: #34c759;
--warning: #ff9f0a;
--danger: #ff3b30;
--critical: #af52de;
}}
* {{ box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
margin: 0;
padding: 24px;
background: var(--bg);
color: var(--text);
line-height: 1.5;
}}
h1 {{ margin: 0 0 8px; font-size: 28px; }}
h2 {{ margin: 32px 0 12px; font-size: 20px; border-bottom: 1px solid var(--border); padding-bottom: 6px; }}
.subtitle {{ color: var(--muted); margin: 0 0 24px; font-size: 14px; }}
.cards {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 12px; margin-bottom: 8px; }}
.card {{
background: var(--card);
border: 1px solid var(--border);
border-radius: 10px;
padding: 16px;
}}
.card .label {{ font-size: 12px; color: var(--muted); margin-bottom: 4px; text-transform: uppercase; letter-spacing: 0.5px; }}
.card .value {{ font-size: 22px; font-weight: 600; }}
.card .unit {{ font-size: 13px; color: var(--muted); margin-left: 2px; }}
.critical-path {{
background: var(--card);
border: 1px solid var(--border);
border-left: 4px solid var(--critical);
border-radius: 10px;
padding: 16px;
margin-bottom: 8px;
}}
.critical-path .label {{ font-size: 12px; color: var(--muted); margin-bottom: 8px; text-transform: uppercase; letter-spacing: 0.5px; }}
.critical-path .chain {{ font-family: ui-monospace, "SF Mono", Menlo, monospace; font-size: 13px; word-break: break-all; }}
.critical-path .arrow {{ color: var(--critical); margin: 0 6px; font-weight: 600; }}
/* 甘特图 */
.gantt {{
background: var(--card);
border: 1px solid var(--border);
border-radius: 10px;
padding: 16px;
overflow-x: auto;
}}
.gantt-row {{ display: flex; align-items: center; margin-bottom: 6px; min-width: 600px; }}
.gantt-label {{ width: 200px; flex-shrink: 0; font-size: 13px; font-family: ui-monospace, monospace; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }}
.gantt-track {{ flex: 1; height: 22px; background: #f0f0f3; border-radius: 4px; position: relative; }}
.gantt-bar {{ position: absolute; height: 100%; border-radius: 4px; min-width: 2px; }}
.gantt-bar.success {{ background: var(--success); }}
.gantt-bar.failed {{ background: var(--danger); }}
.gantt-bar.skipped {{ background: var(--muted); }}
.gantt-bar.critical {{ box-shadow: 0 0 0 2px var(--critical) inset; }}
.gantt-bar:hover {{ opacity: 0.85; }}
.gantt-tooltip {{ position: absolute; bottom: 100%; left: 50%; transform: translateX(-50%); background: #1d1d1f; color: #fff; padding: 4px 8px; border-radius: 4px; font-size: 11px; white-space: nowrap; opacity: 0; pointer-events: none; transition: opacity 0.15s; }}
.gantt-bar:hover .gantt-tooltip {{ opacity: 1; }}
/* 表格 */
table {{ width: 100%; border-collapse: collapse; background: var(--card); border-radius: 10px; overflow: hidden; border: 1px solid var(--border); }}
th, td {{ padding: 10px 12px; text-align: left; font-size: 13px; }}
th {{ background: #fafafa; font-weight: 600; color: var(--muted); text-transform: uppercase; font-size: 11px; letter-spacing: 0.5px; }}
tbody tr {{ border-top: 1px solid var(--border); }}
tbody tr:hover {{ background: #fafafa; }}
td.num {{ font-family: ui-monospace, monospace; text-align: right; }}
.badge {{ display: inline-block; padding: 2px 8px; border-radius: 10px; font-size: 11px; font-weight: 500; }}
.badge.success {{ background: rgba(52,199,89,0.15); color: var(--success); }}
.badge.failed {{ background: rgba(255,59,48,0.15); color: var(--danger); }}
.badge.skipped {{ background: rgba(110,110,115,0.15); color: var(--muted); }}
.star {{ color: var(--critical); font-weight: 700; }}
.footer {{ margin-top: 32px; color: var(--muted); font-size: 12px; text-align: center; }}
</style>
</head>
<body>
<h1>PyFlowX 性能剖面报告</h1>
<p class="subtitle">由 <code>pxp</code> 生成 · {generated_at}</p>
<h2>图级指标</h2>
<div class="cards">
<div class="card"><div class="label">总耗时</div><div class="value">{total_duration:.3f}<span class="unit">s</span></div></div>
<div class="card"><div class="label">关键路径耗时</div><div class="value">{critical_duration:.3f}<span class="unit">s</span></div></div>
<div class="card"><div class="label">平均并行度</div><div class="value">{avg_par:.2f}</div></div>
<div class="card"><div class="label">峰值并行度</div><div class="value">{peak_par}</div></div>
<div class="card"><div class="label">并行度效率</div><div class="value">{efficiency:.1f}<span class="unit">%</span></div></div>
<div class="card"><div class="label">任务总数</div><div class="value">{task_count}</div></div>
</div>
<h2>关键路径</h2>
<div class="critical-path">
<div class="label">最长依赖路径(串行瓶颈)</div>
<div class="chain">{critical_chain}</div>
</div>
<h2>任务时间线</h2>
<div class="gantt">
{gantt_rows}
</div>
<h2>Top 瓶颈任务</h2>
<table>
<thead><tr><th>任务</th><th class="num">耗时</th><th class="num">等待</th><th class="num">尝试</th><th>关键路径</th><th>状态</th></tr></thead>
<tbody>
{bottleneck_rows}
</tbody>
</table>
<h2>全部任务</h2>
<table>
<thead><tr><th>任务</th><th class="num">耗时</th><th class="num">等待</th><th class="num">尝试</th><th>关键路径</th><th>状态</th><th>依赖</th></tr></thead>
<tbody>
{all_task_rows}
</tbody>
</table>
<div class="footer">由 PyFlowX · pxp 生成</div>
</body>
</html>"""
def _status_badge(status: TaskStatus) -> str:
"""生成状态徽章 HTML。"""
cls = status.value
return f'<span class="badge {cls}">{cls}</span>'
def _format_critical_chain(path: tuple[str, ...]) -> str:
"""格式化关键路径为 HTML 链。"""
if not path:
return '<em style="color:var(--muted)">(无)</em>'
arrow = '<span class="arrow">→</span>'
return arrow.join(f"<strong>{name}</strong>" for name in path)
def _render_gantt(profile: ProfileReport) -> str:
"""渲染甘特图行 HTML。
每个任务一行:标签 + 时间条。时间条位置基于 wait_time + 依赖关系
重建相对开始时间(相对最早任务起点),归一化到 0-100% 宽度。
SKIPPED 任务不显示(无时间戳)。
"""
visible = [t for t in profile.tasks if t.status != TaskStatus.SKIPPED and t.duration > 0]
if not visible:
return '<div style="color:var(--muted);padding:12px;">(无时间线数据)</div>'
# 重建相对开始时间:start[name] = max(end[dep]) + wait_time
# profile.tasks 已是拓扑序,可直接按序计算
start: dict[str, float] = {}
end: dict[str, float] = {}
for t in profile.tasks:
if t.status == TaskStatus.SKIPPED:
continue
dep_end = 0.0
for dep in t.deps:
dep_end = max(dep_end, end.get(dep, 0.0))
s = dep_end + t.wait_time
start[t.name] = s
end[t.name] = s + t.duration
# 归一化:以最早开始时间为 0,最晚结束为 100%
min_start = min(start.get(t.name, 0.0) for t in visible)
max_end = max(end.get(t.name, 0.0) for t in visible)
span = max_end - min_start
if span <= 0:
span = 1.0
rows: list[str] = []
for t in visible:
s = start.get(t.name, 0.0) - min_start
left_pct = (s / span) * 100
width_pct = (t.duration / span) * 100
cls = t.status.value
critical_cls = " critical" if t.is_on_critical_path else ""
tooltip = f"{t.name}: {t.duration:.3f}s @ +{s:.3f}s ({t.status.value})"
rows.append(
f' <div class="gantt-row">'
f'<div class="gantt-label" title="{t.name}">{t.name}</div>'
f'<div class="gantt-track">'
f'<div class="gantt-bar {cls}{critical_cls}" style="left:{left_pct:.2f}%;width:{width_pct:.2f}%">'
f'<span class="gantt-tooltip">{tooltip}</span>'
f"</div></div></div>"
)
return "\n".join(rows)
def _render_task_row(t: TaskProfile, show_deps: bool = False) -> str:
"""渲染任务表格行 HTML。"""
star = '<span class="star">★</span>' if t.is_on_critical_path else ""
deps = ", ".join(t.deps) if show_deps and t.deps else ""
deps_cell = f"<td>{deps}</td>" if show_deps else ""
return (
f" <tr>"
f"<td><code>{t.name}</code></td>"
f'<td class="num">{t.duration:.3f}s</td>'
f'<td class="num">{t.wait_time:.3f}s</td>'
f'<td class="num">{t.attempts}</td>'
f"<td>{star}</td>"
f"<td>{_status_badge(t.status)}</td>"
f"{deps_cell}"
f"</tr>"
)
def _render_html(profile: ProfileReport) -> str:
"""渲染完整 HTML 报告。"""
from datetime import datetime as _dt
bottlenecks = profile.top_bottlenecks(5)
bottleneck_rows = (
"\n".join(_render_task_row(t) for t in bottlenecks)
or ' <tr><td colspan="6" style="color:var(--muted);">(无)</td></tr>'
)
all_task_rows = (
"\n".join(_render_task_row(t, show_deps=True) for t in profile.tasks)
or ' <tr><td colspan="7" style="color:var(--muted);">(无)</td></tr>'
)
return _HTML_TEMPLATE.format(
generated_at=_dt.now().strftime("%Y-%m-%d %H:%M:%S"),
total_duration=profile.total_duration,
critical_duration=profile.critical_path_duration,
avg_par=profile.avg_parallelism,
peak_par=profile.peak_parallelism,
efficiency=profile.parallelism_efficiency * 100,
task_count=len(profile.tasks),
critical_chain=_format_critical_chain(profile.critical_path),
gantt_rows=_render_gantt(profile),
bottleneck_rows=bottleneck_rows,
all_task_rows=all_task_rows,
)
+545
View File
@@ -0,0 +1,545 @@
"""pxp 性能分析器测试.
覆盖策略:
* HTML 渲染:to_html() 输出结构正确,含关键章节。
* pxp CLI:参数解析、脚本执行、报告生成、浏览器调用、错误处理。
* hook 注入:捕获 px.run() 调用,还原原始函数。
"""
from __future__ import annotations
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import pytest
import pyflowx as px
from pyflowx.cli import profiler
from pyflowx.profiling import ProfileReport
from pyflowx.report import RunReport
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
def _fn() -> int:
return 1
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
return TaskSpec[Any](name, _fn, depends_on=deps)
def _result(
name: str,
start: datetime,
duration: float,
*,
status: TaskStatus = TaskStatus.SUCCESS,
attempts: int = 1,
) -> TaskResult[Any]:
"""构造带时间戳的 TaskResult."""
end = start + timedelta(seconds=duration) if duration > 0 else start
return TaskResult[Any](
spec=_spec(name),
status=status,
value=None,
attempts=attempts,
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
)
def _build_simple_profile() -> ProfileReport:
"""构造一个简单的 ProfileReport 用于测试 HTML 输出."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
])
return ProfileReport.from_report(report, graph)
class TestToHtml:
"""测试 ProfileReport.to_html()."""
def test_to_html_contains_key_sections(self) -> None:
"""HTML 应包含所有关键章节标题。"""
profile = _build_simple_profile()
html = profile.to_html()
assert "<!DOCTYPE html>" in html
assert "PyFlowX 性能剖面报告" in html
assert "图级指标" in html
assert "关键路径" in html
assert "任务时间线" in html
assert "Top 瓶颈任务" in html
assert "全部任务" in html
def test_to_html_contains_metrics(self) -> None:
"""HTML 应包含图级指标数值。"""
profile = _build_simple_profile()
html = profile.to_html()
# 总耗时 3.0s (a=1 + b=2)
assert "3.000" in html
# 任务名
assert "a" in html
assert "b" in html
def test_to_html_contains_critical_path(self) -> None:
"""HTML 应包含关键路径任务链。"""
profile = _build_simple_profile()
html = profile.to_html()
# 关键路径是 a -> b
assert "<strong>a</strong>" in html
assert "<strong>b</strong>" in html
def test_to_html_contains_gantt_bars(self) -> None:
"""HTML 应包含甘特图条。"""
profile = _build_simple_profile()
html = profile.to_html()
assert "gantt-row" in html
assert "gantt-bar" in html
# 每个非 SKIPPED 任务一个条
assert html.count("gantt-bar") >= 2
def test_to_html_empty_profile(self) -> None:
"""空报告的 HTML 应不崩溃。"""
report = px.RunReport()
graph = px.Graph()
profile = ProfileReport.from_report(report, graph)
html = profile.to_html()
assert "PyFlowX 性能剖面报告" in html
assert "(无)" in html
def test_to_html_with_failed_task(self) -> None:
"""含 FAILED 任务的 HTML 应包含失败状态徽章。"""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
html = profile.to_html()
assert "failed" in html
assert "badge" in html
def test_to_html_with_skipped_task(self) -> None:
"""含 SKIPPED 任务的 HTML 不应在甘特图中显示该任务。"""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = TaskResult[Any](
spec=_spec("b"),
status=TaskStatus.SKIPPED,
reason="skip",
)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
html = profile.to_html()
# SKIPPED 任务的徽章应出现
assert "skipped" in html
def test_to_html_self_contained(self) -> None:
"""HTML 应自包含(无外部依赖)。"""
profile = _build_simple_profile()
html = profile.to_html()
# 不引用外部资源
assert "<link" not in html
assert "<script src" not in html
class TestProfilerArgumentParsing:
"""测试 pxp CLI 参数解析。"""
def test_default_export_is_html(self) -> None:
"""默认导出格式为 html。"""
parser = profiler._build_parser()
args, remaining = parser.parse_known_args(["pymake.py"])
assert args.export == "html"
assert args.no_browser is False
assert args.output is None
assert remaining == ["pymake.py"]
def test_export_text(self) -> None:
"""-E text 应设置导出格式为 text。"""
parser = profiler._build_parser()
args, _ = parser.parse_known_args(["-E", "text", "pymake.py"])
assert args.export == "text"
def test_no_browser_flag(self) -> None:
"""--no-browser 应设置标志。"""
parser = profiler._build_parser()
args, _ = parser.parse_known_args(["--no-browser", "pymake.py"])
assert args.no_browser is True
def test_output_option(self) -> None:
"""-o 应设置输出路径。"""
parser = profiler._build_parser()
args, _ = parser.parse_known_args(["-o", "report.html", "pymake.py"])
assert args.output == "report.html"
def test_script_args_separated(self) -> None:
"""脚本参数应通过 remaining 分离。"""
parser = profiler._build_parser()
_, remaining = parser.parse_known_args(["pymake.py", "t", "--quiet"])
assert remaining == ["pymake.py", "t", "--quiet"]
def test_no_args_prints_help(
self,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""无参数应打印帮助并以退出码 2 退出。"""
monkeypatch.setattr(sys, "argv", ["pxp"])
with pytest.raises(SystemExit) as exc_info:
profiler.main()
assert exc_info.value.code == 2
captured = capsys.readouterr()
assert "usage" in captured.out.lower() or "usage" in captured.err.lower()
class TestCapturePxRun:
"""测试 _capture_px_run hook 注入。"""
def test_capture_captures_run_call(self) -> None:
"""hook 应捕获 px.run() 调用的 graph 和 report。"""
captured = profiler._capture_px_run()
try:
graph = px.Graph.from_specs([px.TaskSpec("a", lambda: 1)])
px.run(graph, strategy="sequential")
assert "graph" in captured
assert "report" in captured
assert captured["graph"] is graph
finally:
captured["_restore"]()
def test_capture_restores_original(self) -> None:
"""还原后 px.run 和 RunReport.__init__ 应恢复为原函数。"""
original_run = px.run
original_init = RunReport.__init__
captured = profiler._capture_px_run()
# 注入期间 px.run 和 RunReport.__init__ 已被替换
assert px.run is not original_run
assert RunReport.__init__ is not original_init
captured["_restore"]()
# 还原后恢复
assert px.run is original_run
assert RunReport.__init__ is original_init
def test_capture_via_runner_run(self) -> None:
"""hook 应捕获通过 CliRunner 执行的 run() 调用。"""
from pyflowx import runner as runner_mod
captured = profiler._capture_px_run()
try:
# 验证 runner.run 也被 patch(指向 patched_run
assert runner_mod.run is px.executors.run
graph = px.Graph.from_specs([px.TaskSpec("a", lambda: 1)])
runner_mod.run(graph, strategy="sequential")
assert "report" in captured
finally:
captured["_restore"]()
def test_capture_captures_report_on_failure(self) -> None:
"""run() 抛出 TaskFailedError 时仍应捕获 report 实例。"""
from pyflowx.executors import TaskFailedError
def failing() -> None:
raise RuntimeError("boom")
graph = px.Graph.from_specs([px.TaskSpec("a", failing)])
captured = profiler._capture_px_run()
try:
with pytest.raises(TaskFailedError):
px.run(graph, strategy="sequential")
# 即使 run() 抛异常,report 也应被捕获(含已执行任务的结果)
assert "report" in captured
assert "graph" in captured
assert captured["graph"] is graph
finally:
captured["_restore"]()
class TestRunTargetScript:
"""测试 _run_target_script。"""
def test_run_simple_script(self, tmp_path: Path) -> None:
"""应能执行简单脚本并返回模块字典。"""
script = tmp_path / "simple.py"
script.write_text("x = 42\n", encoding="utf-8")
result = profiler._run_target_script(script, [])
assert result["x"] == 42
def test_run_script_with_sys_exit(self, tmp_path: Path) -> None:
"""脚本调用 sys.exit 应抛 SystemExit。"""
script = tmp_path / "exit.py"
script.write_text("import sys; sys.exit(0)\n", encoding="utf-8")
with pytest.raises(SystemExit):
profiler._run_target_script(script, [])
def test_run_script_sets_argv(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""应正确设置 sys.argv。"""
script = tmp_path / "argv.py"
script.write_text(
"import sys\nassert sys.argv[0] == __file__\nassert sys.argv[1:] == ['arg1', 'arg2']\n",
encoding="utf-8",
)
profiler._run_target_script(script, ["arg1", "arg2"])
def test_run_script_adds_dir_to_path(self, tmp_path: Path) -> None:
"""脚本所在目录应加入 sys.path。"""
script = tmp_path / "pathcheck.py"
script.write_text(
"import sys, os\nassert os.path.dirname(__file__) in sys.path\n",
encoding="utf-8",
)
profiler._run_target_script(script, [])
class TestOutputReport:
"""测试 _output_report。"""
def test_output_text_format(
self,
capsys: pytest.CaptureFixture[str],
) -> None:
"""text 格式应打印 describe() 到 stdout。"""
profile = _build_simple_profile()
profiler._output_report(profile, export="text", output=None, script_stem="test", no_browser=True)
captured = capsys.readouterr()
assert "PyFlowX 性能剖面报告" in captured.out
assert "图级指标" in captured.out
def test_output_html_default_filename(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""HTML 默认输出到 <script>_profile.html。"""
monkeypatch.chdir(tmp_path)
profile = _build_simple_profile()
profiler._output_report(profile, export="html", output=None, script_stem="mymake", no_browser=True)
out_file = tmp_path / "mymake_profile.html"
assert out_file.exists()
content = out_file.read_text(encoding="utf-8")
assert "PyFlowX 性能剖面报告" in content
def test_output_html_custom_path(self, tmp_path: Path) -> None:
"""HTML 应写入指定路径。"""
out_file = tmp_path / "custom.html"
profile = _build_simple_profile()
profiler._output_report(profile, export="html", output=str(out_file), script_stem="test", no_browser=True)
assert out_file.exists()
assert "PyFlowX" in out_file.read_text(encoding="utf-8")
def test_output_html_opens_browser(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""no_browser=False 应调用 webbrowser.open。"""
monkeypatch.chdir(tmp_path)
opened: list[str] = []
monkeypatch.setattr(profiler.webbrowser, "open", opened.append)
profile = _build_simple_profile()
profiler._output_report(profile, export="html", output=None, script_stem="test", no_browser=False)
assert len(opened) == 1
assert opened[0].startswith("file://")
def test_output_html_no_browser_flag(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""no_browser=True 不应调用 webbrowser.open。"""
monkeypatch.chdir(tmp_path)
opened: list[str] = []
monkeypatch.setattr(profiler.webbrowser, "open", opened.append)
profile = _build_simple_profile()
profiler._output_report(profile, export="html", output=None, script_stem="test", no_browser=True)
assert len(opened) == 0
class TestProfilerMainIntegration:
"""main() 集成测试。"""
def test_main_analyses_script_with_px_run(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""main() 应分析含 px.run() 的脚本并生成 HTML。"""
script = tmp_path / "mytool.py"
script.write_text(
"import pyflowx as px\n"
"graph = px.Graph.from_specs([\n"
" px.TaskSpec('a', lambda: 1),\n"
" px.TaskSpec('b', lambda: 2, depends_on=('a',)),\n"
"])\n"
"px.run(graph, strategy='sequential')\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
profiler.main()
assert out_file.exists()
content = out_file.read_text(encoding="utf-8")
assert "PyFlowX 性能剖面报告" in content
assert "任务时间线" in content
def test_main_analyses_script_with_clirunner(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""main() 应分析含 CliRunner 的脚本。"""
script = tmp_path / "clirunner_tool.py"
script.write_text(
"import pyflowx as px\n"
"runner = px.CliRunner(\n"
" aliases={'t': px.TaskSpec('t', lambda: 1)},\n"
")\n"
"runner.run_cli(['t'])\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
profiler.main()
assert out_file.exists()
content = out_file.read_text(encoding="utf-8")
assert "PyFlowX 性能剖面报告" in content
def test_main_text_export(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
) -> None:
"""main() -E text 应输出文本到 stdout。"""
script = tmp_path / "simple.py"
script.write_text(
"import pyflowx as px\n"
"graph = px.Graph.from_specs([px.TaskSpec('a', lambda: 1)])\n"
"px.run(graph, strategy='sequential')\n",
encoding="utf-8",
)
monkeypatch.setattr(sys, "argv", ["pxp", "-E", "text", "--no-browser", str(script)])
profiler.main()
captured = capsys.readouterr()
assert "PyFlowX 性能剖面报告" in captured.out
def test_main_script_not_exist(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
) -> None:
"""脚本不存在应以退出码 2 退出。"""
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(tmp_path / "nonexistent.py")])
with pytest.raises(SystemExit) as exc_info:
profiler.main()
assert exc_info.value.code == 2
def test_main_no_px_run_captured(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""脚本未调用 px.run() 应以退出码 1 退出。"""
script = tmp_path / "no_run.py"
script.write_text("print('just printing')\n", encoding="utf-8")
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(script)])
with pytest.raises(SystemExit) as exc_info:
profiler.main()
assert exc_info.value.code == 1
def test_main_passes_script_args(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""应将脚本参数传递给目标脚本。"""
script = tmp_path / "argcheck.py"
script.write_text(
"import sys\n"
"assert sys.argv[1:] == ['myarg'], f'got {sys.argv[1:]}'\n"
"import pyflowx as px\n"
"px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script), "myarg"])
profiler.main() # 不抛异常即成功
def test_main_handles_script_exception(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""脚本抛异常时应捕获并继续生成报告(如果有 report)。"""
script = tmp_path / "raise.py"
script.write_text(
"import pyflowx as px\n"
"px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n"
"raise RuntimeError('after run')\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
profiler.main() # 不抛异常即成功
assert out_file.exists()
def test_main_auto_calls_main_when_no_main_block(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""脚本无 __main__ 块但定义了 main() 时应自动调用。"""
script = tmp_path / "no_main_block.py"
script.write_text(
"import pyflowx as px\n"
"def main():\n"
" px.run(px.Graph.from_specs([px.TaskSpec('a', lambda: 1)]), strategy='sequential')\n"
"# 无 if __name__ == '__main__'\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script)])
profiler.main()
assert out_file.exists()
def test_main_auto_calls_main_with_clirunner(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""脚本无 __main__ 块但定义了调用 CliRunner 的 main() 时应自动调用。"""
script = tmp_path / "cli_tool.py"
script.write_text(
"import pyflowx as px\n"
"def main():\n"
" runner = px.CliRunner(\n"
" aliases={'t': px.TaskSpec('t', lambda: 1)},\n"
" )\n"
" runner.run_cli(['t'])\n",
encoding="utf-8",
)
out_file = tmp_path / "report.html"
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", "-o", str(out_file), str(script), "t"])
profiler.main()
assert out_file.exists()
content = out_file.read_text(encoding="utf-8")
assert "PyFlowX 性能剖面报告" in content
def test_main_no_main_function_exits_with_1(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""脚本无 main() 且未调用 px.run() 应以退出码 1 退出。"""
script = tmp_path / "no_main.py"
script.write_text("x = 1\n", encoding="utf-8")
monkeypatch.setattr(sys, "argv", ["pxp", "--no-browser", str(script)])
with pytest.raises(SystemExit) as exc_info:
profiler.main()
assert exc_info.value.code == 1
class TestTryCallMain:
"""测试 _try_call_main。"""
def test_calls_main_when_present(self) -> None:
"""模块字典含 main 可调用对象时应调用它。"""
called: list[bool] = []
def fake_main() -> None:
called.append(True)
profiler._try_call_main({"main": fake_main})
assert called == [True]
def test_no_main_does_nothing(self) -> None:
"""模块字典不含 main 时不应报错。"""
profiler._try_call_main({}) # 不抛异常即成功
def test_non_callable_main_does_nothing(self) -> None:
"""main 不是可调用对象时不应报错。"""
profiler._try_call_main({"main": "not a function"}) # 不抛异常即成功
+17
View File
@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
@@ -123,6 +124,22 @@ class TestTaskSpecDefinitions:
assert spec.cmd == ["tox", "-p", "auto"] assert spec.cmd == ["tox", "-p", "auto"]
assert spec.skip_if_missing is False assert spec.skip_if_missing is False
def test_all_tasks_have_correct_cwd(self) -> None:
"""所有任务应该有正确的 cwd 设置(指向项目根目录)."""
# 验证 ROOT_DIR 定义正确(向上三层到达项目根目录)
expected_root = Path(__file__).parent.parent.parent
assert expected_root == pymake.ROOT_DIR
# 验证 tasks 中的所有命令任务都有正确的 cwd
for spec in pymake.tasks:
if spec.cmd is not None:
assert spec.cwd == pymake.ROOT_DIR, f"任务 {spec.name} 的 cwd 应为 {pymake.ROOT_DIR}"
# 验证 aliases 中的内联任务(doc/lint/tox)也有正确的 cwd
for name in ("doc", "lint", "tox"):
spec = _find_task(name)
assert spec.cwd == pymake.ROOT_DIR, f"任务 {name} 的 cwd 应为 {pymake.ROOT_DIR}"
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
# main function # main function
+574
View File
@@ -0,0 +1,574 @@
"""性能剖面(ProfileReport)测试.
覆盖策略:
* 构造带时间戳的 RunReport + Graph,验证关键路径、并行度、瓶颈排序。
* 边界场景:空报告、单任务、无时间戳、SKIPPED 任务、图校验失败。
* 输出格式:to_dict / describe / top_bottlenecks / critical_tasks。
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
import pyflowx as px
from pyflowx.profiling import ProfileReport, TaskProfile
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
def _fn() -> int:
return 1
def _spec(name: str, deps: tuple[str, ...] = ()) -> TaskSpec[Any]:
return TaskSpec[Any](name, _fn, depends_on=deps)
def _result(
name: str,
start: datetime,
duration: float,
*,
status: TaskStatus = TaskStatus.SUCCESS,
attempts: int = 1,
) -> TaskResult[Any]:
"""构造带时间戳的 TaskResult."""
end = start + timedelta(seconds=duration) if duration > 0 else start
return TaskResult[Any](
spec=_spec(name),
status=status,
value=None,
attempts=attempts,
started_at=start if duration > 0 or status != TaskStatus.SKIPPED else None,
finished_at=end if duration > 0 or status != TaskStatus.SKIPPED else None,
)
def _skipped_result(name: str, reason: str = "skip") -> TaskResult[Any]:
"""构造 SKIPPED 结果(无时间戳)."""
return TaskResult[Any](
spec=_spec(name),
status=TaskStatus.SKIPPED,
reason=reason,
)
class TestProfileReportConstruction:
"""测试 ProfileReport 构建."""
def test_empty_report(self) -> None:
"""空报告应产生空剖面."""
report = px.RunReport()
graph = px.Graph()
profile = ProfileReport.from_report(report, graph)
assert len(profile.tasks) == 0
assert profile.total_duration == 0.0
assert profile.critical_path == ()
assert profile.critical_path_duration == 0.0
assert profile.avg_parallelism == 0.0
assert profile.peak_parallelism == 0
def test_single_task(self) -> None:
"""单任务:关键路径就是它自己,并行度为 1."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.5)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
assert len(profile.tasks) == 1
assert profile.tasks[0].name == "a"
assert profile.tasks[0].duration == 1.5
assert profile.tasks[0].is_on_critical_path
assert profile.total_duration == 1.5
assert profile.critical_path == ("a",)
assert profile.critical_path_duration == 1.5
assert profile.avg_parallelism == 1.0
assert profile.peak_parallelism == 1
assert profile.parallelism_efficiency == 1.0
def test_serial_chain(self) -> None:
"""串行链 a -> b -> c:关键路径为全部,效率 100%."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start + timedelta(seconds=1), 2.0)
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.5)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
_spec("c", deps=("b",)),
])
profile = ProfileReport.from_report(report, graph)
assert profile.total_duration == 4.5
assert profile.critical_path_duration == 4.5
assert profile.critical_path == ("a", "b", "c")
assert profile.parallelism_efficiency == 1.0
assert profile.peak_parallelism == 1
assert profile.avg_parallelism == 1.0
def test_parallel_tasks(self) -> None:
"""并行任务 a, b 同时执行:关键路径取较长者,效率 < 1."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start, 2.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
# wall-clock = 2.0, 关键路径 = 2.0 (b), 效率 = 1.0
# 因为关键路径定义就是最长路径,与 wall-clock 相同
assert profile.total_duration == 2.0
assert profile.critical_path_duration == 2.0
assert profile.critical_path == ("b",)
assert profile.peak_parallelism == 2
# 平均并行度 = (1.0 + 2.0) / 2.0 = 1.5
assert profile.avg_parallelism == 1.5
def test_parallel_with_join(self) -> None:
"""a, b 并行后 join 到 c:关键路径 a->c 或 b->c."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start, 3.0)
report.results["c"] = _result("c", start + timedelta(seconds=3), 1.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b"),
_spec("c", deps=("a", "b")),
])
profile = ProfileReport.from_report(report, graph)
# 关键路径 = b -> c (3 + 1 = 4)
assert profile.critical_path_duration == 4.0
assert profile.critical_path == ("b", "c")
assert profile.tasks[0].is_on_critical_path is False # a 不在关键路径
# task("b") 在关键路径上
assert profile.task("b").is_on_critical_path
assert profile.task("c").is_on_critical_path
def test_skipped_task_no_timestamp(self) -> None:
"""SKIPPED 任务无时间戳:不影响并行度计算."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _skipped_result("b")
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
# b 是 SKIPPEDduration=0
assert profile.task("b").status == TaskStatus.SKIPPED
assert profile.task("b").duration == 0.0
assert profile.peak_parallelism == 1 # 只有 a 在跑
class TestWaitTime:
"""测试等待时间计算."""
def test_no_deps_zero_wait(self) -> None:
"""无依赖任务等待时间为 0."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
assert profile.task("a").wait_time == 0.0
def test_wait_after_dep_completes(self) -> None:
"""b 在 a 完成后等待 0.5s 才开始."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start + timedelta(seconds=1.5), 1.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
])
profile = ProfileReport.from_report(report, graph)
assert profile.task("b").wait_time == 0.5
def test_wait_negative_clamped_to_zero(self) -> None:
"""b 在 a 完成前就开始(异常情况)应钳制为 0."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 2.0)
# b 在 a 还没完成时就开始(不应该但可能发生)
report.results["b"] = _result("b", start + timedelta(seconds=1), 1.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
])
profile = ProfileReport.from_report(report, graph)
# a 在 t=2 结束,b 在 t=1 开始,delta = -1,钳制为 0
assert profile.task("b").wait_time == 0.0
def test_skipped_task_zero_wait(self) -> None:
"""SKIPPED 任务等待时间为 0."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _skipped_result("b")
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
])
profile = ProfileReport.from_report(report, graph)
assert profile.task("b").wait_time == 0.0
class TestCriticalPath:
"""测试关键路径分析."""
def test_diamond_dependency(self) -> None:
"""菱形依赖:a -> b -> d, a -> c -> d,关键路径取较长分支."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
_spec("c", deps=("a",)),
_spec("d", deps=("b", "c")),
])
profile = ProfileReport.from_report(report, graph)
# 关键路径:a -> b -> d = 1 + 3 + 1 = 5
assert profile.critical_path_duration == 5.0
assert profile.critical_path == ("a", "b", "d")
def test_graph_validation_failure_returns_empty(self) -> None:
"""图校验失败(有环)应回退为空关键路径."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
# 手动构造带环的图(绕过校验)
graph = px.Graph()
graph.specs["a"] = _spec("a", deps=("b",))
graph.specs["b"] = _spec("b", deps=("a",))
graph.deps["a"] = ("b",)
graph.deps["b"] = ("a",)
profile = ProfileReport.from_report(report, graph)
# layers() 抛 CycleError,回退为空
assert profile.critical_path == ()
assert profile.critical_path_duration == 0.0
class TestParallelism:
"""测试并行度计算."""
def test_no_timestamps_zero_parallelism(self) -> None:
"""所有任务无时间戳:并行度为 0."""
report = px.RunReport()
report.results["a"] = TaskResult[Any](spec=_spec("a"), status=TaskStatus.SUCCESS)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
assert profile.avg_parallelism == 0.0
assert profile.peak_parallelism == 0
def test_zero_duration_excluded(self) -> None:
"""零耗时任务(end <= start)不参与并行度计算."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 0.0) # 零耗时
report.results["b"] = _result("b", start, 1.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
# 只有 b 参与,峰值 = 1
assert profile.peak_parallelism == 1
def test_skipped_with_timestamps_excluded(self) -> None:
"""SKIPPED 任务即使带时间戳也不参与并行度计算."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
# SKIPPED 但带时间戳(异常但可能发生)
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.SKIPPED)
report.results["b"] = _result("b", start, 1.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
# a 是 SKIPPED,被排除;只有 b 参与
assert profile.peak_parallelism == 1
def test_peak_parallelism_three_tasks(self) -> None:
"""三个任务完全重叠:峰值并行度 = 3."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 3.0)
report.results["b"] = _result("b", start, 3.0)
report.results["c"] = _result("c", start, 3.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
profile = ProfileReport.from_report(report, graph)
assert profile.peak_parallelism == 3
assert profile.avg_parallelism == 3.0
class TestQueries:
"""测试查询方法."""
def test_task_lookup(self) -> None:
"""task(name) 应返回对应剖面."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start, 2.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
assert profile.task("a").name == "a"
assert profile.task("b").duration == 2.0
def test_task_lookup_not_found(self) -> None:
"""task(name) 不存在应抛 KeyError."""
report = px.RunReport()
graph = px.Graph()
profile = ProfileReport.from_report(report, graph)
try:
profile.task("missing")
except KeyError:
pass
else:
raise AssertionError("应抛出 KeyError")
def test_top_bottlenecks(self) -> None:
"""top_bottlenecks 应按耗时降序返回."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start, 3.0)
report.results["c"] = _result("c", start, 2.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b"), _spec("c")])
profile = ProfileReport.from_report(report, graph)
top3 = profile.top_bottlenecks(3)
assert len(top3) == 3
assert top3[0].name == "b"
assert top3[1].name == "c"
assert top3[2].name == "a"
def test_top_bottlenecks_zero_or_negative(self) -> None:
"""n <= 0 应返回空元组."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
assert profile.top_bottlenecks(0) == ()
assert profile.top_bottlenecks(-1) == ()
def test_critical_tasks(self) -> None:
"""critical_tasks 应返回关键路径上的任务(按路径顺序)."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _result("b", start + timedelta(seconds=1), 3.0)
report.results["c"] = _result("c", start + timedelta(seconds=1), 1.0)
report.results["d"] = _result("d", start + timedelta(seconds=4), 1.0)
graph = px.Graph.from_specs([
_spec("a"),
_spec("b", deps=("a",)),
_spec("c", deps=("a",)),
_spec("d", deps=("b", "c")),
])
profile = ProfileReport.from_report(report, graph)
# 关键路径 a -> b -> d
critical = profile.critical_tasks()
assert len(critical) == 3
assert [t.name for t in critical] == ["a", "b", "d"]
def test_failed_tasks(self) -> None:
"""failed_tasks 应返回 FAILED 状态的任务."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0, status=TaskStatus.FAILED)
report.results["b"] = _result("b", start, 1.0)
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
failed = profile.failed_tasks()
assert len(failed) == 1
assert failed[0].name == "a"
def test_skipped_tasks(self) -> None:
"""skipped_tasks 应返回 SKIPPED 状态的任务."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
report.results["b"] = _skipped_result("b")
graph = px.Graph.from_specs([_spec("a"), _spec("b")])
profile = ProfileReport.from_report(report, graph)
skipped = profile.skipped_tasks()
assert len(skipped) == 1
assert skipped[0].name == "b"
class TestOutputFormats:
"""测试输出格式."""
def test_to_dict_structure(self) -> None:
"""to_dict 应返回包含所有字段的字典."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.5)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
d = profile.to_dict()
assert "tasks" in d
assert "total_duration_seconds" in d
assert "critical_path_duration_seconds" in d
assert "critical_path" in d
assert "avg_parallelism" in d
assert "peak_parallelism" in d
assert "parallelism_efficiency" in d
assert "bottlenecks" in d
assert len(d["tasks"]) == 1
assert d["tasks"][0]["name"] == "a"
assert d["tasks"][0]["status"] == "success"
assert d["tasks"][0]["duration_seconds"] == 1.5
assert d["tasks"][0]["is_on_critical_path"] is True
def test_describe_contains_key_sections(self) -> None:
"""describe 应包含关键章节标题."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
text = profile.describe()
assert "PyFlowX 性能剖面报告" in text
assert "【图级指标】" in text
assert "【关键路径】" in text
assert "【Top" in text
assert "【全部任务】" in text
assert "a" in text
def test_describe_empty_report(self) -> None:
"""空报告的 describe 应不崩溃且包含章节标题."""
report = px.RunReport()
graph = px.Graph()
profile = ProfileReport.from_report(report, graph)
text = profile.describe()
assert "【图级指标】" in text
assert "(无)" in text
def test_repr(self) -> None:
"""__repr__ 应包含关键指标."""
start = datetime(2024, 1, 1, 0, 0, 0)
report = px.RunReport()
report.results["a"] = _result("a", start, 1.0)
graph = px.Graph.from_specs([_spec("a")])
profile = ProfileReport.from_report(report, graph)
r = repr(profile)
assert "ProfileReport" in r
assert "tasks=1" in r
assert "total=1.000s" in r
def test_task_profile_to_dict(self) -> None:
"""TaskProfile.to_dict 应返回正确字段."""
tp = TaskProfile(
name="x",
status=TaskStatus.SUCCESS,
duration=1.5,
attempts=2,
wait_time=0.3,
is_on_critical_path=True,
deps=("a", "b"),
)
d = tp.to_dict()
assert d["name"] == "x"
assert d["status"] == "success"
assert d["duration_seconds"] == 1.5
assert d["attempts"] == 2
assert d["wait_time_seconds"] == 0.3
assert d["is_on_critical_path"] is True
assert d["deps"] == ["a", "b"]
class TestIntegrationWithRun:
"""与真实 run() 集成测试."""
def test_profile_from_real_run(self) -> None:
"""从真实 run() 结果构建剖面."""
import time
def slow() -> int:
time.sleep(0.01) # 确保任务有实际耗时,避免 duration 极小导致并行度计算为 0
return 1
graph = px.Graph.from_specs([
px.TaskSpec("a", slow),
px.TaskSpec("b", slow, depends_on=("a",)),
px.TaskSpec("c", slow, depends_on=("a",)),
])
report = px.run(graph, strategy="sequential")
profile = ProfileReport.from_report(report, graph)
assert len(profile.tasks) == 3
# sequential 策略下应为串行,duration > 0
assert profile.critical_path_duration > 0
# sequential 策略下并行度应为 1
assert profile.peak_parallelism == 1
def test_profile_from_thread_run(self) -> None:
"""从 thread 策略 run() 结果构建剖面,验证并行度 > 1."""
import time
def slow() -> int:
time.sleep(0.05)
return 1
graph = px.Graph.from_specs([
px.TaskSpec("a", slow),
px.TaskSpec("b", slow),
px.TaskSpec("c", slow),
])
report = px.run(graph, strategy="thread", max_workers=3)
profile = ProfileReport.from_report(report, graph)
# 三个任务并行,峰值应 >= 2(可能因调度时机不到 3)
assert profile.peak_parallelism >= 2
assert profile.critical_path_duration > 0
+2 -2
View File
@@ -193,8 +193,8 @@ def test_should_execute_skip_if_missing_cmd_not_found() -> None:
def test_should_execute_skip_if_missing_cmd_found() -> None: def test_should_execute_skip_if_missing_cmd_found() -> None:
"""skip_if_missing 但命令存在时应执行.""" """skip_if_missing 但命令存在时应执行."""
# 使用 Python 作为已安装的命令 # 使用 Python 作为已安装的命令Windows 上 echo 是 shell 内置,shutil.which 找不到)
spec = TaskSpec("a", cmd=["echo"], skip_if_missing=True) # echo 应存在 spec = TaskSpec("a", cmd=["python"], skip_if_missing=True) # python 应存在
should_run, reason = spec.should_execute({}) should_run, reason = spec.should_execute({})
assert should_run is True assert should_run is True
assert reason is None assert reason is None
Generated
+1 -1
View File
@@ -5603,7 +5603,7 @@ pycountry = [
[[package]] [[package]]
name = "pyflowx" name = "pyflowx"
version = "0.2.11" version = "0.2.13"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" }, { name = "graphlib-backport", marker = "python_full_version < '3.9'" },