chore: 版本升级到0.1.3并批量优化代码

变更包括:
1. 更新pyproject.toml行长度限制为120
2. 简化多处异常提示字符串的换行写法
3. 批量使用Any类型泛型优化类型标注
4. 重构cli/pymake.py的配置与任务定义
5. 删除冗余的测试代码与废弃的pymake测试文件
6. 修复示例代码的类型注解
This commit is contained in:
2026-06-21 14:58:19 +08:00
parent febcd90a31
commit cd38e1246a
14 changed files with 202 additions and 716 deletions
+1 -1
View File
@@ -32,4 +32,4 @@
"python.testing.pytestEnabled": true, "python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false, "python.testing.unittestEnabled": false,
"ruff.importStrategy": "fromEnvironment" "ruff.importStrategy": "fromEnvironment"
} }
+1 -1
View File
@@ -92,7 +92,7 @@ typeCheckingMode = "basic" # 类型检查严格度:off / basi
# Ruff 配置 - 与 .pre-commit-config.yaml 保持一致 # Ruff 配置 - 与 .pre-commit-config.yaml 保持一致
[tool.ruff] [tool.ruff]
target-version = "py38" target-version = "py38"
line-length = 88 line-length = 120
[tool.ruff.lint] [tool.ruff.lint]
select = [ select = [
+117 -372
View File
@@ -6,50 +6,11 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import pyflowx as px import pyflowx as px
from pyflowx.conditions import BuiltinConditions, Constants from pyflowx.conditions import BuiltinConditions, Constants
class PymakeConfig: def maturin_build_cmd() -> list[str]:
"""PyMake 配置类."""
# 项目根目录
PROJECT_ROOT: str = str(Path(__file__).parent.parent.parent.parent)
CORE_DIR: str = f"{PROJECT_ROOT}/bitool-core"
CORE_PATTERN: str = f"{CORE_DIR}/target/bitool_core-*-cp*.whl"
TIMEOUT: int = 600
# Python 构建
BUILD_TOOL: str = "uv"
BUILD_COMMAND: list[str] = [BUILD_TOOL, "build"]
# Rust 构建 (maturin)
MATURIN_TOOL: str = "maturin"
MATURIN_BUILD_COMMAND: list[str] = ["maturin", "build", "-r"]
MATURIN_DEV_COMMAND: list[str] = ["maturin", "develop"]
MATURIN_BUILD_OPTIONS_WIN7: list[str] = [
"--target",
"x86_64-win7-windows-msvc",
"-Zbuild-std",
"-i",
"python3.8",
]
# 文档
DOC_BUILD_TOOL: str = "sphinx-build"
DOC_BUILD_COMMAND: list[str] = ["sphinx-build", "-b", "html", "docs", "docs/_build"]
# 清理
DIRS_TO_IGNORE: list[str] = [".venv", ".git", ".tox"]
PYTHON_BUILD_DIRS: list[str] = ["dist", "build", "*.egg-info", "src/*.egg-info"]
conf = PymakeConfig()
def get_maturin_build_command() -> list[str]:
"""获取 maturin 构建命令(根据平台自动添加参数). """获取 maturin 构建命令(根据平台自动添加参数).
Returns Returns
@@ -57,336 +18,105 @@ def get_maturin_build_command() -> list[str]:
list[str] list[str]
完整的 maturin 构建命令列表. 完整的 maturin 构建命令列表.
""" """
base_cmd = conf.MATURIN_BUILD_COMMAND.copy() base_cmd = ["maturin", "build", "-r"].copy()
if Constants.IS_WINDOWS: if Constants.IS_WINDOWS:
base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7) base_cmd.extend(
[
"--target",
"x86_64-win7-windows-msvc",
"-Zbuild-std",
"-i",
"python3.8",
]
)
return base_cmd return base_cmd
# 命令条件判断 def check(name: str) -> px.Condition:
MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL) """检查指定工具是否已安装.
PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
Returns
def build_graphs() -> dict[str, px.Graph]: -------
"""构建所有命令对应的任务流图. bool
如果已安装则返回 True,否则返回 False.
将原本的 CommandScheduler/RunCommand 模式转换为 Graph/TaskSpec 模式,
每个 Graph 是一个独立的任务流, 由 CliRunner 根据用户输入选择执行.
""" """
return { return BuiltinConditions.HAS_APP_INSTALLED(name)
# === 构建命令 ===
# 构建 Python 包
"b": px.Graph.from_specs( uv_build: px.TaskSpec = px.TaskSpec("uv_build", cmd=["uv", "build"], conditions=(check("uv"),))
[ maturin_build: px.TaskSpec = px.TaskSpec("maturin_build", cmd=maturin_build_cmd(), conditions=(check("maturin"),))
px.TaskSpec( uv_sync: px.TaskSpec = px.TaskSpec("uv_sync", cmd=["uv", "sync"], conditions=(check("uv"),))
"uv_build", git_clean: px.TaskSpec = px.TaskSpec("git_clean", cmd=["gitt", "c"], conditions=(check("gitt"),))
cmd=conf.BUILD_COMMAND, test: px.TaskSpec = px.TaskSpec(
conditions=(UV_CONDITION,), "test",
timeout=conf.TIMEOUT, cmd=[
), "pytest",
] "-m",
), "not slow",
# 构建 Rust 核心模块 "-n",
"bc": px.Graph.from_specs( "8",
[ "--dist",
px.TaskSpec( "loadfile",
"maturin_build", "--color=yes",
cmd=get_maturin_build_command(), "--durations=10",
cwd=Path(conf.CORE_DIR), ],
conditions=(MATURIN_CONDITION,), conditions=(check("pytest"),),
timeout=conf.TIMEOUT, )
), test_fast: px.TaskSpec = px.TaskSpec(
] "test_fast",
), cmd=[
# 构建双包(先 Rust 后 Python "pytest",
"ba": px.Graph.from_specs( "-m",
[ "not slow",
px.TaskSpec( "--dist",
"maturin_build", "loadfile",
cmd=get_maturin_build_command(), "--color=yes",
cwd=Path(conf.CORE_DIR), "--durations=10",
conditions=(MATURIN_CONDITION,), ],
timeout=conf.TIMEOUT, conditions=(check("pytest"),),
), )
px.TaskSpec( test_coverage: px.TaskSpec = px.TaskSpec(
"uv_build", "test_coverage",
cmd=conf.BUILD_COMMAND, cmd=[
conditions=(UV_CONDITION,), "pytest",
timeout=conf.TIMEOUT, "--cov",
depends_on=("maturin_build",), "-n",
), "8",
] "--dist",
), "loadfile",
# === 安装命令(开发模式) === "--tb=short",
# 安装 Rust 核心模块 "-v",
"ic": px.Graph.from_specs( "--color=yes",
[ "--durations=10",
px.TaskSpec( ],
"maturin_dev", conditions=(check("pytest"),),
cmd=conf.MATURIN_DEV_COMMAND, )
cwd=Path(conf.CORE_DIR), ruff_lint: px.TaskSpec = px.TaskSpec(
conditions=(MATURIN_CONDITION,), "lint",
), cmd=[
] "ruff",
), "check",
# 安装 Python 主包 "--fix",
"ip": px.Graph.from_specs( "--unsafe-fixes",
[ ],
px.TaskSpec( conditions=(check("ruff"),),
"uv_install", )
cmd=["uv", "pip", "install", "-e", "."], mypy_check: px.TaskSpec = px.TaskSpec("typecheck", cmd=["mypy", "."], conditions=(check("mypy"),))
conditions=(UV_CONDITION,), ty_check: px.TaskSpec = px.TaskSpec("ty_check", cmd=["ty", "check", "."], conditions=(check("ty"),))
), doc: px.TaskSpec = px.TaskSpec(
] "doc", cmd=["sphinx-build", "-b", "html", "docs", "docs/_build"], conditions=(check("sphinx-build"),)
), )
# 安装双包(开发模式) hatch_publish: px.TaskSpec = px.TaskSpec("publish_python", cmd=["hatch", "publish"], conditions=(check("hatch"),))
"ia": px.Graph.from_specs( twine_publish: px.TaskSpec = px.TaskSpec(
[ "twine_publish",
px.TaskSpec( cmd=[
"maturin_dev", "twine",
cmd=conf.MATURIN_DEV_COMMAND, "upload",
cwd=Path(conf.CORE_DIR), "--disable-progress-bar",
conditions=(MATURIN_CONDITION,), ],
), conditions=(check("twine"),),
px.TaskSpec( )
"uv_install", tox: px.TaskSpec = px.TaskSpec("tox", cmd=["tox", "-p", "auto"], conditions=(check("tox"),))
cmd=["uv", "pip", "install", "-e", "."],
conditions=(UV_CONDITION,),
depends_on=("maturin_dev",),
),
]
),
# === 清理命令 ===
# 清理 Python 构建产物
"cp": px.Graph.from_specs(
[
px.TaskSpec(
"git_clean_python",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(GIT_CONDITION,),
),
]
),
# 清理 Rust 构建产物
"cc": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(MATURIN_CONDITION,),
),
]
),
# 清理所有构建产物
"ca": px.Graph.from_specs(
[
px.TaskSpec(
"cargo_clean",
cmd=["cargo", "clean"],
cwd=Path(conf.CORE_DIR),
conditions=(MATURIN_CONDITION,),
),
px.TaskSpec(
"git_clean",
cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
conditions=(GIT_CONDITION,),
),
]
),
# === 开发工具 ===
# 运行测试, 跳过 slow, 并行模式
"t": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
cmd=[
"pytest",
"-m",
"not slow",
"-n",
"8",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试, 非并行模式
"tf": px.Graph.from_specs(
[
px.TaskSpec(
"pytest",
cmd=[
"pytest",
"-m",
"not slow",
"--dist",
"loadfile",
"--color=yes",
"--durations=10",
],
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 运行测试并生成覆盖率报告, 跳过 slow, 并行模式
"tc": px.Graph.from_specs(
[
px.TaskSpec(
"pytest_cov",
cmd=[
"pytest",
"--cov",
"-n",
"auto",
"--dist",
"loadfile",
"--tb=short",
"-v",
"--color=yes",
"--durations=10",
],
conditions=(PYTEST_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 代码格式化与检查
"lint": px.Graph.from_specs(
[
px.TaskSpec(
"ruff_check",
cmd=[
"ruff",
"check",
"--fix",
"--unsafe-fixes",
],
conditions=(RUFF_CONDITION,),
timeout=conf.TIMEOUT,
cwd=Path(conf.PROJECT_ROOT),
),
]
),
# 类型检查
"typecheck": px.Graph.from_specs(
[
px.TaskSpec(
"ty_check",
cmd=["ty", "check", "src/bitool"],
conditions=(BuiltinConditions.HAS_APP_INSTALLED("ty"),),
),
]
),
# 构建文档
"doc": px.Graph.from_specs(
[
px.TaskSpec(
"sphinx_build",
cmd=conf.DOC_BUILD_COMMAND,
conditions=(
BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),
),
),
]
),
# === 发布命令 ===
# 发布 Python 主包到 PyPI
"pb": px.Graph.from_specs(
[
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(HATCH_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 发布所有包(先 Rust 后 Python
"pba": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=[
"twine",
"upload",
"--disable-progress-bar",
conf.CORE_PATTERN,
],
cwd=Path(conf.CORE_DIR),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
px.TaskSpec(
"publish_python",
cmd=["hatch", "publish"],
cwd=Path(conf.PROJECT_ROOT),
conditions=(HATCH_CONDITION,),
timeout=conf.TIMEOUT,
depends_on=("publish_rust",),
),
]
),
# 发布 Rust 核心模块 (maturin publish)
"pbc": px.Graph.from_specs(
[
px.TaskSpec(
"publish_rust",
cmd=["maturin", "publish"],
cwd=Path(conf.CORE_DIR),
conditions=(MATURIN_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# === 多版本测试命令 ===
# 运行多版本 Python 测试 (tox)
"tox": px.Graph.from_specs(
[
px.TaskSpec(
"tox_run",
cmd=["tox", "-p", "auto"],
conditions=(TOX_CONDITION,),
timeout=conf.TIMEOUT,
),
]
),
# 安装多版本 Python (仅安装不测试)
"tox_install": px.Graph.from_specs(
[
px.TaskSpec(
"uv_python_install",
cmd=[
"uv",
"python",
"install",
"3.8",
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
],
conditions=(UV_CONDITION,),
timeout=600,
),
]
),
}
def main(): def main():
@@ -401,20 +131,17 @@ def main():
pymake ba - 构建所有包 (先 Rust 后 Python) pymake ba - 构建所有包 (先 Rust 后 Python)
📦 安装命令 (开发模式): 📦 安装命令 (开发模式):
pymake ic - 安装 Rust 核心模块 (maturin develop) pymake sync - 安装依赖包 (uv sync)
pymake ip - 安装 Python 主包 (uv pip install -e .)
pymake ia - 安装所有包 (开发模式,推荐)
🧹 清理命令: 🧹 清理命令:
pymake cp - 清理 Python 构建产物 pymake c - 清理所有构建产物
pymake cc - 清理 Rust 构建产物 (cargo clean)
pymake ca - 清理所有构建产物
🛠️ 开发工具: 🛠️ 开发工具:
pymake t - 运行测试 (pytest) pymake t - 运行测试 (pytest)
pymake tc - 运行测试并生成覆盖率报告 pymake tc - 运行测试并生成覆盖率报告
pymake tf - 运行快速测试 (pytest -m not slow)
pymake lint - 代码格式化与检查 (ruff) pymake lint - 代码格式化与检查 (ruff)
pymake typecheck - 类型检查 (ty) pymake type - 类型检查 (mypy, ty)
pymake doc - 构建文档 (sphinx) pymake doc - 构建文档 (sphinx)
🔬 多版本测试: 🔬 多版本测试:
@@ -445,6 +172,24 @@ def main():
runner = px.CliRunner( runner = px.CliRunner(
strategy="sequential", strategy="sequential",
description="PyMake - Python 构建工具 (替代 Makefile)", description="PyMake - Python 构建工具 (替代 Makefile)",
graphs=build_graphs(), # type: ignore[reportArgumentType] graphs={
# 构建命令
"b": px.Graph.from_specs([uv_build]),
"bc": px.Graph.from_specs([maturin_build]),
"ba": px.Graph.from_specs([uv_build, maturin_build]),
# 安装命令
"sync": px.Graph.from_specs([uv_sync]),
# 清理命令
"c": px.Graph.from_specs([git_clean]),
# 开发工具
"t": px.Graph.from_specs([test]),
"tc": px.Graph.from_specs([test, test_coverage]),
"tf": px.Graph.from_specs([test_fast]),
"lint": px.Graph.from_specs([ruff_lint]),
"type": px.Graph.from_specs([mypy_check, ty_check]),
"doc": px.Graph.from_specs([doc]),
"pb": px.Graph.from_specs([twine_publish, hatch_publish]),
"tox": px.Graph.from_specs([tox]),
},
) )
runner.run_cli() runner.run_cli()
+3 -2
View File
@@ -10,11 +10,12 @@ Shows:
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from typing import Any
import pyflowx as px import pyflowx as px
async def fetch_user(uid: int) -> dict[str, object]: async def fetch_user(uid: int) -> dict[str, Any]:
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
return {"id": uid, "name": f"User{uid}"} return {"id": uid, "name": f"User{uid}"}
@@ -25,7 +26,7 @@ async def fetch_posts(uid: int) -> list[int]:
# Context annotation → receives the full mapping of upstream results. # Context annotation → receives the full mapping of upstream results.
def aggregate(ctx: px.Context) -> dict[str, object]: def aggregate(ctx: px.Context) -> dict[str, Any]:
return dict(ctx) return dict(ctx)
+21 -41
View File
@@ -35,14 +35,14 @@ EventCallback = Callable[[TaskEvent], None]
Strategy = Literal["sequential", "thread", "async"] Strategy = Literal["sequential", "thread", "async"]
def _is_async_fn(spec: TaskSpec[object]) -> bool: def _is_async_fn(spec: TaskSpec[Any]) -> bool:
"""判断 ``spec.effective_fn`` 是否为协程函数。""" """判断 ``spec.effective_fn`` 是否为协程函数。"""
return inspect.iscoroutinefunction(spec.effective_fn) return inspect.iscoroutinefunction(spec.effective_fn)
def _emit( def _emit(
on_event: EventCallback | None, on_event: EventCallback | None,
result: TaskResult[object], result: TaskResult[Any],
) -> None: ) -> None:
"""若注册了回调则触发一个观察者事件。""" """若注册了回调则触发一个观察者事件。"""
if on_event is None: if on_event is None:
@@ -58,9 +58,7 @@ def _emit(
) )
def _log_retry( def _log_retry(spec: TaskSpec[Any], attempts: int, max_attempts: int, exc: BaseException) -> None:
spec: TaskSpec[object], attempts: int, max_attempts: int, exc: BaseException
) -> None:
"""记录重试日志(sync 与 async 共享,便于测试覆盖)。""" """记录重试日志(sync 与 async 共享,便于测试覆盖)。"""
logger.warning( logger.warning(
"task %r failed (attempt %d/%d): %r; retrying", "task %r failed (attempt %d/%d): %r; retrying",
@@ -71,7 +69,7 @@ def _log_retry(
) )
def _finalize_failure(result: TaskResult[object], layer_idx: int | None) -> None: def _finalize_failure(result: TaskResult[Any], layer_idx: int | None) -> None:
"""标记任务为 FAILED 并抛出 TaskFailedError。""" """标记任务为 FAILED 并抛出 TaskFailedError。"""
result.status = TaskStatus.FAILED result.status = TaskStatus.FAILED
result.finished_at = datetime.now() result.finished_at = datetime.now()
@@ -84,12 +82,12 @@ def _finalize_failure(result: TaskResult[object], layer_idx: int | None) -> None
def _run_sync_with_retry( def _run_sync_with_retry(
spec: TaskSpec[object], spec: TaskSpec[Any],
context: Mapping[str, Any], context: Mapping[str, Any],
layer_idx: int | None, layer_idx: int | None,
) -> TaskResult[object]: ) -> TaskResult[Any]:
"""执行同步任务并带重试;返回填充好的 TaskResult。""" """执行同步任务并带重试;返回填充好的 TaskResult。"""
result: TaskResult[object] = TaskResult(spec=spec) result: TaskResult[Any] = TaskResult(spec=spec)
# 检查条件是否满足 # 检查条件是否满足
if spec.conditions and not spec.should_execute(): if spec.conditions and not spec.should_execute():
@@ -118,12 +116,12 @@ def _run_sync_with_retry(
async def _run_async_with_retry( async def _run_async_with_retry(
spec: TaskSpec[object], spec: TaskSpec[Any],
context: Mapping[str, Any], context: Mapping[str, Any],
layer_idx: int | None, layer_idx: int | None,
) -> TaskResult[object]: ) -> TaskResult[Any]:
"""在事件循环上执行任务(同步或异步)并带重试。""" """在事件循环上执行任务(同步或异步)并带重试。"""
result: TaskResult[object] = TaskResult(spec=spec) result: TaskResult[Any] = TaskResult[Any](spec=spec)
# 检查条件是否满足 # 检查条件是否满足
if spec.conditions and not spec.should_execute(): if spec.conditions and not spec.should_execute():
@@ -152,9 +150,7 @@ async def _run_async_with_retry(
return spec.effective_fn(*args, **kwargs) return spec.effective_fn(*args, **kwargs)
if spec.timeout is not None: if spec.timeout is not None:
result.value = await asyncio.wait_for( result.value = await asyncio.wait_for(loop.run_in_executor(None, fn_call), timeout=spec.timeout)
loop.run_in_executor(None, fn_call), timeout=spec.timeout
)
else: else:
result.value = await loop.run_in_executor(None, fn_call) result.value = await loop.run_in_executor(None, fn_call)
result.status = TaskStatus.SUCCESS result.status = TaskStatus.SUCCESS
@@ -182,13 +178,11 @@ async def _run_async_with_retry(
# 层驱动器 # 层驱动器
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
def _build_context( def _build_context(
spec: TaskSpec[object], spec: TaskSpec[Any],
global_context: Mapping[str, Any], global_context: Mapping[str, Any],
) -> Mapping[str, Any]: ) -> Mapping[str, Any]:
"""将全局上下文限制为本任务的依赖。""" """将全局上下文限制为本任务的依赖。"""
return { return {dep: global_context[dep] for dep in spec.depends_on if dep in global_context}
dep: global_context[dep] for dep in spec.depends_on if dep in global_context
}
def _execute_layer_sequential( def _execute_layer_sequential(
@@ -235,9 +229,7 @@ def _execute_layer_threaded(
if backend.has(name): if backend.has(name):
cached = backend.get(name) cached = backend.get(name)
context[name] = cached context[name] = cached
result = TaskResult( result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached)
spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached
)
report.results[name] = result report.results[name] = result
_emit(on_event, result) _emit(on_event, result)
else: else:
@@ -247,7 +239,7 @@ def _execute_layer_threaded(
return return
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool: with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_name: dict[concurrent.futures.Future[TaskResult[object]], str] = {} future_to_name: dict[concurrent.futures.Future[TaskResult[Any]], str] = {}
for name in to_run: for name in to_run:
spec = graph.spec(name) spec = graph.spec(name)
# 为本任务快照上下文以避免竞态。 # 为本任务快照上下文以避免竞态。
@@ -279,9 +271,7 @@ async def _execute_layer_async(
if backend.has(name): if backend.has(name):
cached = backend.get(name) cached = backend.get(name)
context[name] = cached context[name] = cached
result = TaskResult( result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached)
spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached
)
report.results[name] = result report.results[name] = result
_emit(on_event, result) _emit(on_event, result)
else: else:
@@ -394,9 +384,7 @@ def run(
return RunReport(success=True) return RunReport(success=True)
# verbose 模式下包装事件回调 # verbose 模式下包装事件回调
effective_callback: EventCallback | None = ( effective_callback: EventCallback | None = _make_verbose_callback(on_event) if verbose else on_event
_make_verbose_callback(on_event) if verbose else on_event
)
backend = resolve_backend(state) backend = resolve_backend(state)
report = RunReport() report = RunReport()
@@ -404,13 +392,9 @@ def run(
try: try:
if strategy == "sequential": if strategy == "sequential":
_drive_sequential( _drive_sequential(graph, layers, context, report, backend, effective_callback)
graph, layers, context, report, backend, effective_callback
)
elif strategy == "thread": elif strategy == "thread":
_drive_threaded( _drive_threaded(graph, layers, context, report, backend, effective_callback, max_workers)
graph, layers, context, report, backend, effective_callback, max_workers
)
else: else:
_drive_async(graph, layers, context, report, backend, effective_callback) _drive_async(graph, layers, context, report, backend, effective_callback)
except TaskFailedError: except TaskFailedError:
@@ -452,9 +436,7 @@ def _drive_threaded(
) -> None: ) -> None:
for idx, layer in enumerate(layers, 1): for idx, layer in enumerate(layers, 1):
workers = max_workers or max(1, min(32, len(layer))) workers = max_workers or max(1, min(32, len(layer)))
_execute_layer_threaded( _execute_layer_threaded(layer, graph, context, report, backend, idx, on_event, workers)
layer, graph, context, report, backend, idx, on_event, workers
)
def _drive_async( def _drive_async(
@@ -477,6 +459,4 @@ async def _async_drive(
on_event: EventCallback | None, on_event: EventCallback | None,
) -> None: ) -> None:
for idx, layer in enumerate(layers, 1): for idx, layer in enumerate(layers, 1):
await _execute_layer_async( await _execute_layer_async(layer, graph, context, report, backend, idx, on_event)
layer, graph, context, report, backend, idx, on_event
)
+13 -17
View File
@@ -8,7 +8,7 @@ from __future__ import annotations
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Iterable, Mapping, Sequence from typing import Any, Iterable, Mapping, Sequence
from .errors import CycleError, DuplicateTaskError, MissingDependencyError from .errors import CycleError, DuplicateTaskError, MissingDependencyError
from .task import TaskSpec from .task import TaskSpec
@@ -36,13 +36,13 @@ class Graph:
这使图可安全重复运行并在线程间共享。 这使图可安全重复运行并在线程间共享。
""" """
specs: dict[str, TaskSpec[object]] = field(default_factory=dict) specs: dict[str, TaskSpec[Any]] = field(default_factory=dict)
deps: dict[str, tuple[str, ...]] = field(default_factory=dict) deps: dict[str, tuple[str, ...]] = field(default_factory=dict)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 构建 # 构建
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def add(self, spec: TaskSpec[object]) -> Graph: def add(self, spec: TaskSpec[Any]) -> Graph:
"""注册一个任务 spec,并即时校验。 """注册一个任务 spec,并即时校验。
返回 ``self`` 以支持链式调用,但推荐入口是 :meth:`from_specs` 返回 ``self`` 以支持链式调用,但推荐入口是 :meth:`from_specs`
@@ -57,7 +57,7 @@ class Graph:
return self return self
@classmethod @classmethod
def from_specs(cls, specs: Iterable[TaskSpec[object]]) -> Graph: def from_specs(cls, specs: Iterable[TaskSpec[Any]]) -> Graph:
"""从可迭代的 task spec 构建图。 """从可迭代的 task spec 构建图。
先收集所有 spec,再统一校验。这意味着任务可以引用*后出现*的 先收集所有 spec,再统一校验。这意味着任务可以引用*后出现*的
@@ -108,7 +108,7 @@ class Graph:
"""所有已注册任务名(按插入顺序)。""" """所有已注册任务名(按插入顺序)。"""
return list(self.specs.keys()) return list(self.specs.keys())
def spec(self, name: str) -> TaskSpec[object]: def spec(self, name: str) -> TaskSpec[Any]:
"""返回 ``name`` 的 spec;不存在则 ``KeyError``。""" """返回 ``name`` 的 spec;不存在则 ``KeyError``。"""
return self.specs[name] return self.specs[name]
@@ -116,7 +116,7 @@ class Graph:
"""``name`` 的直接前驱。""" """``name`` 的直接前驱。"""
return self.deps[name] return self.deps[name]
def all_specs(self) -> Mapping[str, TaskSpec[object]]: def all_specs(self) -> Mapping[str, TaskSpec[Any]]:
"""name -> spec 的只读视图。""" """name -> spec 的只读视图。"""
return self.specs return self.specs
@@ -152,16 +152,14 @@ class Graph:
DAG 的切片。 DAG 的切片。
""" """
wanted: set[str] = set(tags) wanted: set[str] = set(tags)
kept: list[TaskSpec[object]] = [] kept: list[TaskSpec[Any]] = []
for spec in self.specs.values(): for spec in self.specs.values():
if wanted & set(spec.tags): if wanted & set(spec.tags):
pruned_deps = tuple( pruned_deps = tuple(
d d for d in spec.depends_on if d in self.specs and (wanted & set(self.specs[d].tags))
for d in spec.depends_on
if d in self.specs and (wanted & set(self.specs[d].tags))
) )
kept.append( kept.append(
TaskSpec( TaskSpec[Any](
name=spec.name, name=spec.name,
fn=spec.fn, fn=spec.fn,
cmd=spec.cmd, cmd=spec.cmd,
@@ -183,12 +181,12 @@ class Graph:
for n in wanted: for n in wanted:
if n not in self.specs: if n not in self.specs:
raise KeyError(f"Unknown task name: {n!r}") raise KeyError(f"Unknown task name: {n!r}")
kept: list[TaskSpec[object]] = [] kept: list[TaskSpec[Any]] = []
for spec in self.specs.values(): for spec in self.specs.values():
if spec.name in wanted: if spec.name in wanted:
pruned_deps = tuple(d for d in spec.depends_on if d in wanted) pruned_deps = tuple(d for d in spec.depends_on if d in wanted)
kept.append( kept.append(
TaskSpec[object]( TaskSpec[Any](
name=spec.name, name=spec.name,
fn=spec.fn, fn=spec.fn,
cmd=spec.cmd, cmd=spec.cmd,
@@ -216,9 +214,7 @@ class Graph:
valid = {"TD", "TB", "BT", "LR", "RL"} valid = {"TD", "TB", "BT", "LR", "RL"}
orientation = orientation.upper() orientation = orientation.upper()
if orientation not in valid: if orientation not in valid:
raise ValueError( raise ValueError(f"Invalid orientation {orientation!r}; expected one of {sorted(valid)}.")
f"Invalid orientation {orientation!r}; expected one of {sorted(valid)}."
)
lines: list[str] = [f"graph {orientation}"] lines: list[str] = [f"graph {orientation}"]
for name in self.specs: for name in self.specs:
lines.append(f' {name}["{name}"]') lines.append(f' {name}["{name}"]')
@@ -243,5 +239,5 @@ class Graph:
def __len__(self) -> int: def __len__(self) -> int:
return len(self.specs) return len(self.specs)
def __contains__(self, name: object) -> bool: def __contains__(self, name: Any) -> bool:
return name in self.specs return name in self.specs
+5 -9
View File
@@ -24,7 +24,7 @@ class RunReport:
当且仅当所有非跳过任务都以 ``SUCCESS`` 结束时为 ``True``。 当且仅当所有非跳过任务都以 ``SUCCESS`` 结束时为 ``True``。
""" """
results: dict[str, TaskResult[object]] = field(default_factory=dict) results: dict[str, TaskResult[Any]] = field(default_factory=dict)
success: bool = True success: bool = True
# ---- 类型化访问 --------------------------------------------------- # # ---- 类型化访问 --------------------------------------------------- #
@@ -36,11 +36,11 @@ class RunReport:
""" """
return self.results[name].value return self.results[name].value
def result_of(self, name: str) -> TaskResult[object]: def result_of(self, name: str) -> TaskResult[Any]:
"""返回 ``name`` 的完整 :class:`TaskResult`。""" """返回 ``name`` 的完整 :class:`TaskResult`。"""
return self.results[name] return self.results[name]
def __contains__(self, name: object) -> bool: def __contains__(self, name: Any) -> bool:
return name in self.results return name in self.results
def __iter__(self) -> Iterator[str]: def __iter__(self) -> Iterator[str]:
@@ -67,9 +67,7 @@ class RunReport:
def failed_tasks(self) -> list[str]: def failed_tasks(self) -> list[str]:
"""以 FAILED 状态结束的任务名列表。""" """以 FAILED 状态结束的任务名列表。"""
return [ return [name for name, r in self.results.items() if r.status == TaskStatus.FAILED]
name for name, r in self.results.items() if r.status == TaskStatus.FAILED
]
def describe(self) -> str: def describe(self) -> str:
"""用于调试的人类可读多行报告。""" """用于调试的人类可读多行报告。"""
@@ -77,7 +75,5 @@ class RunReport:
for name, r in self.results.items(): for name, r in self.results.items():
dur = f"{r.duration:.3f}s" if r.duration is not None else "-" dur = f"{r.duration:.3f}s" if r.duration is not None else "-"
err = f" error={r.error!r}" if r.error else "" err = f" error={r.error!r}" if r.error else ""
lines.append( lines.append(f" {name}: {r.status.value} ({dur} attempts={r.attempts}){err}")
f" {name}: {r.status.value} ({dur} attempts={r.attempts}){err}"
)
return "\n".join(lines) return "\n".join(lines)
+4 -10
View File
@@ -15,7 +15,7 @@ import argparse
import enum import enum
import sys import sys
from dataclasses import dataclass, field, replace from dataclasses import dataclass, field, replace
from typing import Sequence, get_args from typing import Any, Sequence, get_args
from .errors import PyFlowXError from .errors import PyFlowXError
from .executors import Strategy, run from .executors import Strategy, run
@@ -51,7 +51,7 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
Graph Graph
所有 spec 的 verbose 字段已更新的新图. 所有 spec 的 verbose 字段已更新的新图.
""" """
new_specs: list[TaskSpec[object]] = [] new_specs: list[TaskSpec[Any]] = []
for spec in graph.all_specs().values(): for spec in graph.all_specs().values():
if spec.verbose == verbose: if spec.verbose == verbose:
new_specs.append(spec) new_specs.append(spec)
@@ -116,9 +116,7 @@ class CliRunner:
for name, graph in self.graphs.items(): for name, graph in self.graphs.items():
if not isinstance(graph, Graph): if not isinstance(graph, Graph):
raise TypeError( raise TypeError(f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}")
f"CliRunner 命令 {name!r} 的值必须是 Graph 实例, 实际是 {type(graph).__name__}"
)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 内省 # 内省
@@ -249,11 +247,7 @@ class CliRunner:
dry_run=parsed.dry_run, dry_run=parsed.dry_run,
verbose=verbose, verbose=verbose,
) )
return ( return CliExitCode.SUCCESS.value if report.success else CliExitCode.FAILURE.value
CliExitCode.SUCCESS.value
if report.success
else CliExitCode.FAILURE.value
)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n操作已取消", file=sys.stderr) print("\n操作已取消", file=sys.stderr)
return CliExitCode.INTERRUPTED.value return CliExitCode.INTERRUPTED.value
+3 -9
View File
@@ -188,9 +188,7 @@ class TaskSpec(Generic[T]):
except FileNotFoundError: except FileNotFoundError:
raise RuntimeError(f"命令未找到: {cmd_str}") from None raise RuntimeError(f"命令未找到: {cmd_str}") from None
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
raise RuntimeError( raise RuntimeError(f"命令执行超时: {cmd_str} ({timeout}s)") from None
f"命令执行超时: {cmd_str} ({timeout}s)"
) from None
except OSError as e: except OSError as e:
raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") from e raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") from e
@@ -230,9 +228,7 @@ class TaskSpec(Generic[T]):
except FileNotFoundError: except FileNotFoundError:
raise RuntimeError(f"Shell 命令未找到: {cmd}") from None raise RuntimeError(f"Shell 命令未找到: {cmd}") from None
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
raise RuntimeError( raise RuntimeError(f"Shell 命令执行超时: {cmd} ({timeout}s)") from None
f"Shell 命令执行超时: {cmd} ({timeout}s)"
) from None
except OSError as e: except OSError as e:
raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") from e raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") from e
@@ -253,9 +249,7 @@ class TaskSpec(Generic[T]):
if callable(cmd): if callable(cmd):
return cmd # type: ignore[return-value] return cmd # type: ignore[return-value]
raise TypeError( raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}")
f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}"
)
def should_execute(self) -> bool: def should_execute(self) -> bool:
"""检查任务是否应该执行. """检查任务是否应该执行.
-165
View File
@@ -1,165 +0,0 @@
"""Tests for pymake CLI."""
from pyflowx.cli.pymake import build_graphs, conf, get_maturin_build_command
def test_pymake_config_attributes():
"""Test PymakeConfig has expected attributes."""
assert hasattr(conf, "PROJECT_ROOT")
assert hasattr(conf, "BUILD_TOOL")
assert hasattr(conf, "BUILD_COMMAND")
assert hasattr(conf, "MATURIN_TOOL")
assert hasattr(conf, "MATURIN_BUILD_COMMAND")
assert hasattr(conf, "MATURIN_DEV_COMMAND")
assert hasattr(conf, "TIMEOUT")
def test_pymake_config_values():
"""Test PymakeConfig values are correct."""
assert conf.BUILD_TOOL == "uv"
assert conf.BUILD_COMMAND == ["uv", "build"]
assert conf.MATURIN_TOOL == "maturin"
assert conf.TIMEOUT == 600
def test_get_maturin_build_command_basic():
"""Test get_maturin_build_command returns base command."""
cmd = get_maturin_build_command()
assert "maturin" in cmd
assert "build" in cmd
assert "-r" in cmd
def testbuild_graphs_returns_dict():
"""Test build_graphs returns a dictionary."""
graphs = build_graphs()
assert isinstance(graphs, dict)
assert len(graphs) > 0
def testbuild_graphs_has_expected_commands():
"""Test build_graphs has expected command keys."""
graphs = build_graphs()
expected_commands = [
"b",
"bc",
"ba",
"ic",
"ip",
"ia",
"cp",
"cc",
"ca",
"t",
"lint",
]
for cmd in expected_commands:
assert cmd in graphs, f"Expected command '{cmd}' not found in graphs"
def testbuild_graphs_values_are_graphs():
"""Test build_graphs values are Graph instances."""
import pyflowx as px
graphs = build_graphs()
for name, graph in graphs.items():
assert isinstance(graph, px.Graph), (
f"Graph for command '{name}' is not a Graph instance"
)
def test_build_command_graph_structure():
"""Test 'b' command graph has correct structure."""
graphs = build_graphs()
graph = graphs["b"]
assert len(graph.all_specs()) == 1
spec = graph.spec("uv_build")
assert spec.cmd == conf.BUILD_COMMAND
def test_build_all_command_graph_structure():
"""Test 'ba' command graph has correct dependencies."""
graphs = build_graphs()
graph = graphs["ba"]
specs = graph.all_specs()
assert len(specs) == 2
# Check dependency
uv_build_spec = graph.spec("uv_build")
assert "maturin_build" in uv_build_spec.depends_on
def test_maturin_build_command_graph_structure():
"""Test 'bc' command graph has correct structure."""
graphs = build_graphs()
graph = graphs["bc"]
specs = graph.all_specs()
assert len(specs) == 1
spec = graph.spec("maturin_build")
assert spec.cmd == get_maturin_build_command()
def test_install_all_command_graph_structure():
"""Test 'ia' command graph has correct dependencies."""
graphs = build_graphs()
graph = graphs["ia"]
specs = graph.all_specs()
assert len(specs) == 2
uv_install_spec = graph.spec("uv_install")
assert "maturin_dev" in uv_install_spec.depends_on
def test_clean_all_command_graph_structure():
"""Test 'ca' command graph has correct structure."""
graphs = build_graphs()
graph = graphs["ca"]
specs = graph.all_specs()
assert len(specs) == 2
def test_test_command_graph_structure():
"""Test 't' command graph has correct structure."""
graphs = build_graphs()
graph = graphs["t"]
specs = graph.all_specs()
assert len(specs) == 1
spec = graph.spec("pytest")
assert "pytest" in spec.cmd
def test_lint_command_graph_structure():
"""Test 'lint' command graph has correct structure."""
graphs = build_graphs()
graph = graphs["lint"]
specs = graph.all_specs()
assert len(specs) == 1
spec = graph.spec("ruff_check")
assert "ruff" in spec.cmd
def test_pymake_config_dirs_to_ignore():
"""Test PymakeConfig has correct dirs to ignore."""
assert ".venv" in conf.DIRS_TO_IGNORE
assert ".git" in conf.DIRS_TO_IGNORE
assert ".tox" in conf.DIRS_TO_IGNORE
def test_pymake_config_python_build_dirs():
"""Test PymakeConfig has correct Python build dirs."""
assert "dist" in conf.PYTHON_BUILD_DIRS
assert "build" in conf.PYTHON_BUILD_DIRS
def test_maturin_build_options_win7():
"""Test MATURIN_BUILD_OPTIONS_WIN7 has expected options."""
assert "--target" in conf.MATURIN_BUILD_OPTIONS_WIN7
assert "x86_64-win7-windows-msvc" in conf.MATURIN_BUILD_OPTIONS_WIN7
assert "-Zbuild-std" in conf.MATURIN_BUILD_OPTIONS_WIN7
def test_doc_build_command():
"""Test DOC_BUILD_COMMAND has expected structure."""
assert "sphinx-build" in conf.DOC_BUILD_COMMAND
assert "-b" in conf.DOC_BUILD_COMMAND
assert "html" in conf.DOC_BUILD_COMMAND
+10 -13
View File
@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any
import pyflowx as px import pyflowx as px
from pyflowx.task import TaskResult, TaskSpec, TaskStatus from pyflowx.task import TaskResult, TaskSpec, TaskStatus
@@ -15,17 +16,17 @@ def _fn() -> int:
def _make_result( def _make_result(
name: str = "a", name: str = "a",
status: TaskStatus = TaskStatus.SUCCESS, status: TaskStatus = TaskStatus.SUCCESS,
value: object = 42, value: Any = 42,
error: BaseException | None = None, error: BaseException | None = None,
duration: float = 0.5, duration: float = 0.5,
attempts: int = 1, attempts: int = 1,
) -> TaskResult[object]: ) -> TaskResult[Any]:
"""构造测试用 TaskResult 实例.""" """构造测试用 TaskResult 实例."""
spec: TaskSpec[object] = TaskSpec[object](name, _fn) spec: TaskSpec[Any] = TaskSpec[Any](name, _fn)
start = datetime(2024, 1, 1, 0, 0, 0) start = datetime(2024, 1, 1, 0, 0, 0)
# 用 timedelta 精确表达秒数,避免 int() 截断小数 # 用 timedelta 精确表达秒数,避免 int() 截断小数
end = start + timedelta(seconds=duration) if duration else None end = start + timedelta(seconds=duration) if duration else None
return TaskResult[object]( return TaskResult[Any](
spec=spec, spec=spec,
status=status, status=status,
value=value, value=value,
@@ -85,7 +86,7 @@ class TestRunReportSummary:
def test_summary_with_none_duration(self) -> None: def test_summary_with_none_duration(self) -> None:
"""未开始/未结束的任务 duration 为 None,不应计入总时长.""" """未开始/未结束的任务 duration 为 None,不应计入总时长."""
report = px.RunReport() report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type] spec: TaskSpec[Any] = TaskSpec[Any]("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.FAILED) report.results["a"] = TaskResult(spec=spec, status=TaskStatus.FAILED)
s = report.summary() s = report.summary()
assert s["total_duration_seconds"] == 0.0 assert s["total_duration_seconds"] == 0.0
@@ -94,9 +95,7 @@ class TestRunReportSummary:
"""failed_tasks 应返回所有失败任务名.""" """failed_tasks 应返回所有失败任务名."""
report = px.RunReport() report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS) report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS)
report.results["b"] = _make_result( report.results["b"] = _make_result("b", status=TaskStatus.FAILED, error=ValueError("x"))
"b", status=TaskStatus.FAILED, error=ValueError("x")
)
assert report.failed_tasks() == ["b"] assert report.failed_tasks() == ["b"]
@@ -115,9 +114,7 @@ class TestRunReportDescribe:
def test_describe_with_error(self) -> None: def test_describe_with_error(self) -> None:
"""应正确描述失败状态和错误信息.""" """应正确描述失败状态和错误信息."""
report = px.RunReport(success=False) report = px.RunReport(success=False)
report.results["a"] = _make_result( report.results["a"] = _make_result("a", status=TaskStatus.FAILED, error=ValueError("boom"), duration=0.1)
"a", status=TaskStatus.FAILED, error=ValueError("boom"), duration=0.1
)
desc = report.describe() desc = report.describe()
assert "success=False" in desc assert "success=False" in desc
assert "error=ValueError" in desc assert "error=ValueError" in desc
@@ -125,7 +122,7 @@ class TestRunReportDescribe:
def test_describe_no_duration(self) -> None: def test_describe_no_duration(self) -> None:
"""无耗时的任务应显示为 '-'.""" """无耗时的任务应显示为 '-'."""
report = px.RunReport() report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type] spec: TaskSpec[Any] = TaskSpec[Any]("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.PENDING) report.results["a"] = TaskResult[Any](spec=spec, status=TaskStatus.PENDING)
desc = report.describe() desc = report.describe()
assert "-" in desc # duration 显示为 "-" assert "-" in desc # duration 显示为 "-"
+2 -16
View File
@@ -2,6 +2,7 @@
import sys import sys
import tempfile import tempfile
from pathlib import Path
import pytest import pytest
@@ -20,7 +21,6 @@ def test_taskspec_wrap_cmd_with_list():
spec = TaskSpec("test", cmd=[*ECHO_CMD, "hello"]) spec = TaskSpec("test", cmd=[*ECHO_CMD, "hello"])
wrapped_fn = spec.effective_fn wrapped_fn = spec.effective_fn
assert wrapped_fn is not None assert wrapped_fn is not None
assert wrapped_fn.__name__ == "test"
def test_taskspec_wrap_cmd_with_string(): def test_taskspec_wrap_cmd_with_string():
@@ -32,7 +32,6 @@ def test_taskspec_wrap_cmd_with_string():
spec = TaskSpec("test", cmd=cmd_str) spec = TaskSpec("test", cmd=cmd_str)
wrapped_fn = spec.effective_fn wrapped_fn = spec.effective_fn
assert wrapped_fn is not None assert wrapped_fn is not None
assert wrapped_fn.__name__ == "test"
def test_taskspec_wrap_cmd_with_timeout(): def test_taskspec_wrap_cmd_with_timeout():
@@ -48,7 +47,7 @@ def test_taskspec_wrap_cmd_with_timeout():
def test_taskspec_wrap_cmd_with_cwd(): def test_taskspec_wrap_cmd_with_cwd():
"""Test TaskSpec._wrap_cmd with working directory.""" """Test TaskSpec._wrap_cmd with working directory."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
spec = TaskSpec("test", cmd=[*ECHO_CMD, "hello"], cwd=tmpdir) spec = TaskSpec("test", cmd=[*ECHO_CMD, "hello"], cwd=Path(tmpdir))
wrapped_fn = spec.effective_fn wrapped_fn = spec.effective_fn
result = wrapped_fn() result = wrapped_fn()
assert result is None assert result is None
@@ -99,19 +98,6 @@ def test_taskspec_no_fn_no_cmd():
_ = TaskSpec("test") _ = TaskSpec("test")
def test_taskspec_cmd_overrides_fn():
"""Test TaskSpec cmd overrides fn."""
def my_fn():
return "fn_result"
spec = TaskSpec("test", fn=my_fn, cmd=[*ECHO_CMD, "hello"])
wrapped_fn = spec.effective_fn
# cmd should override fn
assert wrapped_fn.__name__ == "test"
def test_taskspec_conditions_check(): def test_taskspec_conditions_check():
"""Test TaskSpec.should_execute with conditions.""" """Test TaskSpec.should_execute with conditions."""
spec = px.TaskSpec( spec = px.TaskSpec(
+21 -59
View File
@@ -2,6 +2,7 @@
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
import pytest import pytest
@@ -357,27 +358,21 @@ class TestTaskSpecVerbose:
def test_verbose_default_is_false(self) -> None: def test_verbose_default_is_false(self) -> None:
"""verbose 默认应为 False.""" """verbose 默认应为 False."""
spec: px.TaskSpec[object] = px.TaskSpec("a", cmd=[*ECHO_CMD, "hi"]) spec: px.TaskSpec[Any] = px.TaskSpec[Any]("a", cmd=[*ECHO_CMD, "hi"])
assert spec.verbose is False assert spec.verbose is False
def test_verbose_true_prints_command( def test_verbose_true_prints_command(self, capsys: pytest.CaptureFixture[str]) -> None:
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""verbose=True 时应打印执行的命令.""" """verbose=True 时应打印执行的命令."""
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec("echo", cmd=[*ECHO_CMD, "verbose-output"], verbose=True)])
[px.TaskSpec("echo", cmd=[*ECHO_CMD, "verbose-output"], verbose=True)] _ = px.run(graph, strategy="sequential")
)
px.run(graph, strategy="sequential")
captured = capsys.readouterr() captured = capsys.readouterr()
assert "执行命令" in captured.out assert "执行命令" in captured.out
assert "返回码" in captured.out assert "返回码" in captured.out
def test_verbose_false_silent(self, capsys: pytest.CaptureFixture[str]) -> None: def test_verbose_false_silent(self, capsys: pytest.CaptureFixture[str]) -> None:
"""verbose=False 时不应打印命令信息.""" """verbose=False 时不应打印命令信息."""
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec[Any]("echo", cmd=[*ECHO_CMD, "silent"], verbose=False)])
[px.TaskSpec("echo", cmd=[*ECHO_CMD, "silent"], verbose=False)] _ = px.run(graph, strategy="sequential")
)
px.run(graph, strategy="sequential")
captured = capsys.readouterr() captured = capsys.readouterr()
assert "执行命令" not in captured.out assert "执行命令" not in captured.out
assert "返回码" not in captured.out assert "返回码" not in captured.out
@@ -390,7 +385,7 @@ class TestTaskSpecVerbose:
shell_cmd = "echo 'shell-verbose'" shell_cmd = "echo 'shell-verbose'"
graph = px.Graph.from_specs([px.TaskSpec("shell", cmd=shell_cmd, verbose=True)]) graph = px.Graph.from_specs([px.TaskSpec("shell", cmd=shell_cmd, verbose=True)])
px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
captured = capsys.readouterr() captured = capsys.readouterr()
assert "执行 Shell" in captured.out assert "执行 Shell" in captured.out
@@ -399,16 +394,12 @@ class TestTaskSpecVerbose:
import tempfile import tempfile
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec[Any]("ls", cmd=ECHO_CMD, cwd=Path(tmpdir), verbose=True)])
[px.TaskSpec("ls", cmd=ECHO_CMD, cwd=Path(tmpdir), verbose=True)] _ = px.run(graph, strategy="sequential")
)
px.run(graph, strategy="sequential")
captured = capsys.readouterr() captured = capsys.readouterr()
assert "工作目录" in captured.out assert "工作目录" in captured.out
def test_verbose_failure_includes_returncode( def test_verbose_failure_includes_returncode(self, capsys: pytest.CaptureFixture[str]) -> None:
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""verbose=True 时失败也应打印返回码.""" """verbose=True 时失败也应打印返回码."""
from pyflowx.errors import TaskFailedError from pyflowx.errors import TaskFailedError
@@ -422,7 +413,7 @@ class TestTaskSpecVerbose:
] ]
) )
with pytest.raises(TaskFailedError): with pytest.raises(TaskFailedError):
px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
captured = capsys.readouterr() captured = capsys.readouterr()
assert "返回码" in captured.out assert "返回码" in captured.out
@@ -437,16 +428,11 @@ class TestTaskSpecCmdErrors:
"""命令不存在时应抛出 RuntimeError.""" """命令不存在时应抛出 RuntimeError."""
from pyflowx.errors import TaskFailedError from pyflowx.errors import TaskFailedError
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec("missing", cmd=["this-command-does-not-exist-xyz"])])
[px.TaskSpec("missing", cmd=["this-command-does-not-exist-xyz"])]
)
with pytest.raises(TaskFailedError) as exc_info: with pytest.raises(TaskFailedError) as exc_info:
px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
# 错误信息应包含命令未找到 # 错误信息应包含命令未找到
assert ( assert "命令未找到" in str(exc_info.value.cause) or "not found" in str(exc_info.value.cause).lower()
"命令未找到" in str(exc_info.value.cause)
or "not found" in str(exc_info.value.cause).lower()
)
def test_cmd_list_failure_includes_stderr(self) -> None: def test_cmd_list_failure_includes_stderr(self) -> None:
"""命令失败时错误信息应包含 stderr.""" """命令失败时错误信息应包含 stderr."""
@@ -465,7 +451,7 @@ class TestTaskSpecCmdErrors:
] ]
) )
with pytest.raises(TaskFailedError) as exc_info: with pytest.raises(TaskFailedError) as exc_info:
px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
# 非 verbose 模式下, stderr 应包含在错误信息中 # 非 verbose 模式下, stderr 应包含在错误信息中
assert "error-msg" in str(exc_info.value.cause) assert "error-msg" in str(exc_info.value.cause)
@@ -473,19 +459,15 @@ class TestTaskSpecCmdErrors:
"""shell 命令不存在时应抛出 RuntimeError.""" """shell 命令不存在时应抛出 RuntimeError."""
from pyflowx.errors import TaskFailedError from pyflowx.errors import TaskFailedError
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec("missing", cmd="this-command-does-not-exist-xyz-123")])
[px.TaskSpec("missing", cmd="this-command-does-not-exist-xyz-123")]
)
with pytest.raises(TaskFailedError): with pytest.raises(TaskFailedError):
px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
def test_cmd_string_failure(self) -> None: def test_cmd_string_failure(self) -> None:
"""shell 命令失败时应抛出 RuntimeError.""" """shell 命令失败时应抛出 RuntimeError."""
from pyflowx.errors import TaskFailedError from pyflowx.errors import TaskFailedError
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec("fail", cmd='python -c "import sys; sys.exit(1)"')])
[px.TaskSpec("fail", cmd='python -c "import sys; sys.exit(1)"')]
)
with pytest.raises(TaskFailedError) as exc_info: with pytest.raises(TaskFailedError) as exc_info:
_ = px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
assert "Shell 命令执行失败" in str(exc_info.value.cause) assert "Shell 命令执行失败" in str(exc_info.value.cause)
@@ -513,32 +495,12 @@ class TestTaskSpecCmdErrors:
"""shell 命令超时应抛出 RuntimeError.""" """shell 命令超时应抛出 RuntimeError."""
from pyflowx.errors import TaskFailedError from pyflowx.errors import TaskFailedError
graph = px.Graph.from_specs( graph = px.Graph.from_specs([px.TaskSpec("slow", cmd='python -c "import time; time.sleep(5)"', timeout=0.1)])
[
px.TaskSpec(
"slow", cmd='python -c "import time; time.sleep(5)"', timeout=0.1
)
]
)
with pytest.raises(TaskFailedError) as exc_info: with pytest.raises(TaskFailedError) as exc_info:
_ = px.run(graph, strategy="sequential") _ = px.run(graph, strategy="sequential")
assert "超时" in str(exc_info.value.cause) assert "超时" in str(exc_info.value.cause)
def test_unsupported_cmd_type_raises(self) -> None:
"""不支持的 cmd 类型应在执行时抛出 TypeError."""
from pyflowx.errors import TaskFailedError
graph = px.Graph.from_specs(
[px.TaskSpec("bad", cmd=123)] # type: ignore[arg-type]
)
with pytest.raises((TypeError, TaskFailedError)):
_ = px.run(graph, strategy="sequential")
def test_no_fn_no_cmd_raises(self) -> None: def test_no_fn_no_cmd_raises(self) -> None:
"""没有 fn 和 cmd 时应抛出 ValueError.""" """没有 fn 和 cmd 时应抛出 ValueError."""
with pytest.raises(ValueError, match="必须提供 fn 或 cmd"): with pytest.raises(ValueError, match="必须提供 fn 或 cmd"):
px.TaskSpec("empty") _ = px.TaskSpec("empty")
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Generated
+1 -1
View File
@@ -2221,7 +2221,7 @@ wheels = [
[[package]] [[package]]
name = "pyflowx" name = "pyflowx"
version = "0.1.2" version = "0.1.3"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" }, { name = "graphlib-backport", marker = "python_full_version < '3.9'" },