Compare commits
5 Commits
232e7293d9
...
v0.2.12
| Author | SHA1 | Date | |
|---|---|---|---|
| 3f9c52e6f1 | |||
| 8fadf6edd8 | |||
| abc1152538 | |||
| 5e561b4b3a | |||
| 40f641611b |
+1
-1
@@ -21,7 +21,7 @@ license = { text = "MIT" }
|
||||
name = "pyflowx"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
version = "0.2.11"
|
||||
version = "0.2.12"
|
||||
|
||||
[project.scripts]
|
||||
autofmt = "pyflowx.cli.autofmt:main"
|
||||
|
||||
@@ -94,10 +94,12 @@ from .task import (
|
||||
TaskResult,
|
||||
TaskSpec,
|
||||
TaskStatus,
|
||||
cmd,
|
||||
task,
|
||||
task_template,
|
||||
)
|
||||
|
||||
__version__ = "0.3.5"
|
||||
__version__ = "0.3.6"
|
||||
|
||||
__all__ = [
|
||||
"IS_LINUX",
|
||||
@@ -135,9 +137,11 @@ __all__ = [
|
||||
"TaskStatus",
|
||||
"TaskTimeoutError",
|
||||
"build_call_args",
|
||||
"cmd",
|
||||
"compose",
|
||||
"describe_injection",
|
||||
"run",
|
||||
"run_command",
|
||||
"task",
|
||||
"task_template",
|
||||
]
|
||||
|
||||
@@ -66,19 +66,10 @@ def backup_folder(src: str, dst: str, max_zip: int = 5) -> None:
|
||||
zip_target(src_path, dst_path, max_zip)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TaskSpec 定义
|
||||
# ============================================================================
|
||||
|
||||
folderback_default: px.TaskSpec = px.TaskSpec(
|
||||
"folderback_default",
|
||||
fn=lambda: backup_folder(".", "./backup", 5),
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Runner
|
||||
# ============================================================================
|
||||
@px.task
|
||||
def folderback_default() -> None:
|
||||
"""备份当前目录到 ./backup."""
|
||||
backup_folder(".", "./backup", 5)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
@@ -86,9 +77,9 @@ def main() -> None:
|
||||
runner = px.CliRunner(
|
||||
strategy="thread",
|
||||
description="FolderBack - 文件夹备份工具",
|
||||
graphs={
|
||||
aliases={
|
||||
# 备份当前目录到 ./backup
|
||||
"b": px.Graph.from_specs([folderback_default]),
|
||||
"b": folderback_default,
|
||||
},
|
||||
)
|
||||
runner.run_cli()
|
||||
|
||||
@@ -57,16 +57,10 @@ def zip_folders(cwd: str = ".") -> None:
|
||||
archive_folder(dir_path)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TaskSpec 定义
|
||||
# ============================================================================
|
||||
|
||||
folderzip_default: px.TaskSpec = px.TaskSpec("folderzip_default", fn=lambda: zip_folders("."))
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Runner
|
||||
# ============================================================================
|
||||
@px.task
|
||||
def folderzip_default() -> None:
|
||||
"""压缩当前目录下的所有文件夹."""
|
||||
zip_folders(".")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
@@ -74,9 +68,9 @@ def main() -> None:
|
||||
runner = px.CliRunner(
|
||||
strategy="thread",
|
||||
description="FolderZip - 文件夹压缩工具",
|
||||
graphs={
|
||||
aliases={
|
||||
# 压缩当前目录下的所有文件夹
|
||||
"z": px.Graph.from_specs([folderzip_default]),
|
||||
"z": folderzip_default,
|
||||
},
|
||||
)
|
||||
runner.run_cli()
|
||||
|
||||
+15
-10
@@ -46,7 +46,12 @@ def init_sub_dirs() -> None:
|
||||
)
|
||||
|
||||
|
||||
isub: px.TaskSpec = px.TaskSpec("isub", fn=init_sub_dirs)
|
||||
@px.task(name="isub")
|
||||
def isub() -> None:
|
||||
"""初始化子目录的Git仓库."""
|
||||
init_sub_dirs()
|
||||
|
||||
|
||||
push: px.TaskSpec = px.TaskSpec("push", cmd=["git", "push"])
|
||||
pull: px.TaskSpec = px.TaskSpec("pull", cmd=["git", "pull"])
|
||||
kill_tgit: px.TaskSpec = px.TaskSpec("task_kill", cmd=["taskkill", "/f", "/t", "/im", "tgitcache.exe"])
|
||||
@@ -67,17 +72,17 @@ def main() -> None:
|
||||
runner = px.CliRunner(
|
||||
strategy="thread",
|
||||
description="Gittool - Git 执行工具.",
|
||||
graphs={
|
||||
aliases={
|
||||
# 添加并提交
|
||||
"a": px.Graph.from_specs([
|
||||
px.TaskSpec("add", cmd=["git", "add", "."], conditions=(lambda _: has_files(),)),
|
||||
px.TaskSpec("commit", cmd=["git", "commit", "-m", "chore: update"], depends_on=("add",)),
|
||||
]),
|
||||
# 清理
|
||||
"c": px.Graph.from_specs([
|
||||
# 清理(chain: clean → status)
|
||||
"c": px.Graph().chain(
|
||||
px.TaskSpec("clean", cmd=["git", "clean", "-xfd", *EXCLUDE_CMDS]),
|
||||
px.TaskSpec("status", cmd=["git", "status", "--porcelain"], depends_on=("clean",)),
|
||||
]),
|
||||
px.TaskSpec("status", cmd=["git", "status", "--porcelain"]),
|
||||
),
|
||||
# 初始化、添加并提交
|
||||
"i": px.Graph.from_specs([
|
||||
px.TaskSpec("init", cmd=["git", "init"], conditions=(lambda _: not_has_git_repo(),)),
|
||||
@@ -90,13 +95,13 @@ def main() -> None:
|
||||
),
|
||||
]),
|
||||
# 初始化子目录
|
||||
"isub": px.Graph.from_specs([isub]),
|
||||
"isub": isub,
|
||||
# 推送
|
||||
"p": px.Graph.from_specs([push]),
|
||||
"p": push,
|
||||
# 拉取
|
||||
"pl": px.Graph.from_specs([pull]),
|
||||
"pl": pull,
|
||||
# 重启TGit缓存
|
||||
"r": px.Graph.from_specs([kill_tgit]),
|
||||
"r": kill_tgit,
|
||||
},
|
||||
)
|
||||
runner.run_cli()
|
||||
|
||||
+58
-67
@@ -9,48 +9,65 @@ from __future__ import annotations
|
||||
import pyflowx as px
|
||||
from pyflowx.conditions import Constants
|
||||
|
||||
MATURIN_BUILD_COMMAND = ["maturin", "build", "-r"]
|
||||
if Constants.IS_WINDOWS:
|
||||
MATURIN_BUILD_COMMAND.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"])
|
||||
|
||||
def maturin_build_cmd() -> list[str]:
|
||||
"""获取 maturin 构建命令(根据平台自动添加参数).
|
||||
# 扁平注册所有任务(px.cmd 自动从命令前两段推导 name)
|
||||
tasks: list[px.TaskSpec] = [
|
||||
px.cmd(["uv", "build"]),
|
||||
px.cmd(MATURIN_BUILD_COMMAND),
|
||||
px.cmd(["uv", "sync"]),
|
||||
px.cmd(["gitt", "c"], name="git_clean"),
|
||||
px.cmd(
|
||||
["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
||||
name="test",
|
||||
),
|
||||
px.cmd(
|
||||
["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"],
|
||||
name="test_fast",
|
||||
),
|
||||
px.cmd(
|
||||
["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
|
||||
name="test_coverage",
|
||||
),
|
||||
px.cmd(["pyrefly", "check", "."]),
|
||||
px.cmd(["git", "add", "-A"], name="git_add_all"),
|
||||
px.cmd(["bumpversion"]),
|
||||
px.cmd(["bumpversion", "minor"]),
|
||||
px.cmd(["git", "push"]),
|
||||
px.cmd(["git", "push", "--tags"], name="git_push_tags"),
|
||||
px.cmd(["hatch", "publish"], name="publish_python"),
|
||||
px.cmd(["twine", "upload", "--disable-progress-bar"], name="twine_publish"),
|
||||
]
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
完整的 maturin 构建命令列表.
|
||||
"""
|
||||
command = ["maturin", "build", "-r"].copy()
|
||||
if Constants.IS_WINDOWS:
|
||||
command.extend(["--target", "x86_64-win7-windows-msvc", "-Zbuild-std", "-i", "python3.8"])
|
||||
return command
|
||||
# 单任务别名(alias 名与任务名相同):直接内联 TaskSpec,避免 str 自引用
|
||||
aliases: dict[str, str | list[str | px.TaskSpec] | px.TaskSpec | px.Graph] = {
|
||||
# 构建命令
|
||||
"b": "uv_build",
|
||||
"bc": "maturin_build",
|
||||
"ba": ["b", "bc"],
|
||||
# 安装命令
|
||||
"sync": "uv_sync",
|
||||
# 清理命令
|
||||
"c": "git_clean",
|
||||
# 开发工具
|
||||
"bump": ["c", "tc", "git_add_all", "bumpversion"],
|
||||
"bumpmi": "bumpversion_minor",
|
||||
"cov": ["git_clean", "test_coverage"],
|
||||
"doc": px.cmd(["sphinx-build", "-b", "html", "docs", "docs/_build"], name="doc"),
|
||||
"lint": px.cmd(["ruff", "check", "--fix", "--unsafe-fixes"], name="lint"),
|
||||
"pb": ["twine_publish", "publish_python"],
|
||||
"t": "test",
|
||||
"tf": "test_fast",
|
||||
"tc": ["pyrefly_check", "lint"],
|
||||
"tox": px.cmd(["tox", "-p", "auto"], name="tox"),
|
||||
# 发布命令
|
||||
"p": ["git_clean", "git_push", "git_push_tags"],
|
||||
}
|
||||
|
||||
|
||||
uv_build: px.TaskSpec = px.TaskSpec("uv_build", cmd=["uv", "build"])
|
||||
maturin_build: px.TaskSpec = px.TaskSpec("maturin_build", cmd=maturin_build_cmd())
|
||||
uv_sync: px.TaskSpec = px.TaskSpec("uv_sync", cmd=["uv", "sync"])
|
||||
git_clean: px.TaskSpec = px.TaskSpec("git_clean", cmd=["gitt", "c"])
|
||||
test: px.TaskSpec = px.TaskSpec(
|
||||
"test", cmd=["pytest", "-m", "not slow", "-n", "8", "--dist", "loadfile", "--color=yes", "--durations=10"]
|
||||
)
|
||||
test_fast: px.TaskSpec = px.TaskSpec(
|
||||
"test_fast", cmd=["pytest", "-m", "not slow", "--dist", "loadfile", "--color=yes", "--durations=10"]
|
||||
)
|
||||
test_coverage: px.TaskSpec = px.TaskSpec(
|
||||
"test_coverage",
|
||||
cmd=["pytest", "--cov", "-n", "8", "--dist", "loadfile", "--tb=short", "-v", "--color=yes", "--durations=10"],
|
||||
)
|
||||
ruff_lint: px.TaskSpec = px.TaskSpec("lint", cmd=["ruff", "check", "--fix", "--unsafe-fixes"])
|
||||
typecheck: px.TaskSpec = px.TaskSpec("pyrefly_check", cmd=["pyrefly", "check", "."])
|
||||
git_add_all: px.TaskSpec = px.TaskSpec("git_add_all", cmd=["git", "add", "-A"])
|
||||
bump: px.TaskSpec = px.TaskSpec("bumpversion", cmd=["bumpversion"])
|
||||
doc: px.TaskSpec = px.TaskSpec("doc", cmd=["sphinx-build", "-b", "html", "docs", "docs/_build"])
|
||||
git_push: px.TaskSpec = px.TaskSpec("git_push", cmd=["git", "push"])
|
||||
git_push_tags: px.TaskSpec = px.TaskSpec("git_push_tags", cmd=["git", "push", "--tags"])
|
||||
hatch_publish: px.TaskSpec = px.TaskSpec("publish_python", cmd=["hatch", "publish"])
|
||||
twine_publish: px.TaskSpec = px.TaskSpec("twine_publish", cmd=["twine", "upload", "--disable-progress-bar"])
|
||||
tox: px.TaskSpec = px.TaskSpec("tox", cmd=["tox", "-p", "auto"])
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
"""pymake 构建工具.
|
||||
|
||||
🔨 构建命令:
|
||||
@@ -78,10 +95,10 @@ def main():
|
||||
📦 发布命令:
|
||||
pymake pb - 发布到 PyPI (twine + hatch)
|
||||
|
||||
� 版本管理:
|
||||
🔖 版本管理:
|
||||
pymake bump - 自动升级版本号并提交修改 (清理 + 检查 + 格式化 + git add + bumpversion)
|
||||
|
||||
�💡 常用工作流:
|
||||
💡 常用工作流:
|
||||
1. 日常开发: pymake lint && pymake t
|
||||
2. 构建发布包: pymake ba
|
||||
3. 多版本兼容性测试: pymake tox
|
||||
@@ -95,31 +112,5 @@ def main():
|
||||
pymake lint # 格式化代码
|
||||
pymake type # 类型检查
|
||||
"""
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
description="PyMake - Python 构建工具",
|
||||
graphs={
|
||||
# 构建命令
|
||||
"b": px.Graph.from_specs([uv_build]),
|
||||
"bc": px.Graph.from_specs([maturin_build]),
|
||||
"ba": px.Graph.from_specs(["b", "bc"]),
|
||||
# 安装命令
|
||||
"sync": px.Graph.from_specs([uv_sync]),
|
||||
# 清理命令
|
||||
"c": px.Graph.from_specs([git_clean]),
|
||||
# 开发工具
|
||||
"bump": px.Graph.from_specs(["c", "tc", git_add_all, bump]),
|
||||
"bumpmi": px.Graph.from_specs([px.TaskSpec("bumpversion_minor", cmd=["bumpversion", "minor"])]),
|
||||
"cov": px.Graph.from_specs([git_clean, test_coverage]),
|
||||
"doc": px.Graph.from_specs([doc]),
|
||||
"lint": px.Graph.from_specs([ruff_lint]),
|
||||
"pb": px.Graph.from_specs([twine_publish, hatch_publish]),
|
||||
"t": px.Graph.from_specs([test]),
|
||||
"tf": px.Graph.from_specs([test_fast]),
|
||||
"tc": px.Graph.from_specs([typecheck, "lint"]),
|
||||
"tox": px.Graph.from_specs([tox]),
|
||||
# 发布命令
|
||||
"p": px.Graph.from_specs([git_clean, git_push, git_push_tags]),
|
||||
},
|
||||
)
|
||||
runner = px.CliRunner(strategy="sequential", description="PyMake - Python 构建工具", tasks=tasks, aliases=aliases)
|
||||
runner.run_cli()
|
||||
|
||||
@@ -41,7 +41,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import atexit
|
||||
import concurrent.futures
|
||||
import contextlib
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
@@ -58,6 +60,59 @@ from .task import TaskEvent, TaskHooks, TaskResult, TaskSpec, TaskStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 进程池复用:同一次 run() 内的 process 任务共享一个 ProcessPoolExecutor。
|
||||
# 模块级缓存避免每次任务都创建/销毁进程池的开销。
|
||||
# run() 结束后通过 _shutdown_process_pool() 关闭(shutdown(wait=False) +
|
||||
# kill 工作进程),避免 Python 退出时 threading._shutdown 等待管理线程
|
||||
# join 工作进程导致数秒阻塞。
|
||||
_process_pool: concurrent.futures.ProcessPoolExecutor | None = None
|
||||
_process_pool_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_process_pool() -> concurrent.futures.ProcessPoolExecutor:
|
||||
"""获取复用的进程池(惰性创建)。"""
|
||||
global _process_pool # noqa: PLW0603
|
||||
if _process_pool is None:
|
||||
with _process_pool_lock:
|
||||
if _process_pool is None:
|
||||
_process_pool = concurrent.futures.ProcessPoolExecutor()
|
||||
return _process_pool
|
||||
|
||||
|
||||
def _shutdown_process_pool() -> None:
|
||||
"""关闭复用的进程池。
|
||||
|
||||
``shutdown(wait=False)`` 通知管理线程退出(管理线程是非 daemon,
|
||||
``threading._shutdown`` 会等待它);同时 kill 工作进程,避免管理线程
|
||||
在退出前逐个 join 工作进程导致数秒阻塞。
|
||||
"""
|
||||
global _process_pool # noqa: PLW0603
|
||||
if _process_pool is not None:
|
||||
pool = _process_pool
|
||||
_process_pool = None
|
||||
# 在 shutdown 前获取进程列表(管理线程退出会清空 _processes)。
|
||||
# _processes 是 ProcessPoolExecutor 的私有属性,无公开 API 替代。
|
||||
procs = list((getattr(pool, "_processes", None) or {}).values())
|
||||
pool.shutdown(wait=False)
|
||||
# 强制终止工作进程(SIGKILL),避免管理线程 join 导致 ~7s 阻塞。
|
||||
for proc in procs:
|
||||
with contextlib.suppress(ProcessLookupError, AttributeError):
|
||||
proc.kill() # type: ignore[attr-defined]
|
||||
|
||||
|
||||
# 兜底:防止未经 run() 直接使用执行器的场景导致进程池泄漏。
|
||||
atexit.register(_shutdown_process_pool)
|
||||
|
||||
|
||||
def _run_in_process(fn: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any:
|
||||
"""模块级函数:在进程池中执行任务(须可 pickle)。
|
||||
|
||||
env_context 等上下文管理器无法跨进程传递,进程池任务的 ``env``/``cwd``
|
||||
不生效;如需设置环境,应在 ``fn`` 内部自行处理。
|
||||
"""
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
|
||||
# 观察者回调类型。
|
||||
EventCallback = Callable[[TaskEvent], None]
|
||||
Strategy = Literal["sequential", "thread", "async", "dependency"]
|
||||
@@ -391,19 +446,50 @@ async def _execute_async_task(
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> Any:
|
||||
"""执行异步或同步任务(带超时处理)。"""
|
||||
# 异步任务直接 await
|
||||
if _is_async_fn(spec):
|
||||
coro = cast(Awaitable[Any], spec.effective_fn(*args, **kwargs))
|
||||
if spec.timeout is not None:
|
||||
return await asyncio.wait_for(coro, timeout=spec.timeout)
|
||||
return await coro
|
||||
return await asyncio.wait_for(coro, timeout=spec.timeout) if spec.timeout is not None else await coro
|
||||
|
||||
# 同步任务:根据 executor 选择执行器
|
||||
fut = _submit_sync_task(spec, args, kwargs, loop)
|
||||
return await asyncio.wait_for(fut, timeout=spec.timeout) if spec.timeout is not None else await fut
|
||||
|
||||
|
||||
def _submit_sync_task(
|
||||
spec: TaskSpec[Any],
|
||||
args: tuple[Any, ...],
|
||||
kwargs: dict[str, Any],
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> asyncio.Future[Any]:
|
||||
"""提交同步任务到对应执行器,返回 Future。
|
||||
|
||||
* ``inline``:直接在事件循环线程调用(阻塞循环,最快)。
|
||||
* ``process``:进程池执行(绕过 GIL,fn 须可 pickle)。
|
||||
* ``thread``(默认):线程池执行。
|
||||
"""
|
||||
|
||||
def fn_call() -> Any:
|
||||
with spec.env_context():
|
||||
return spec.effective_fn(*args, **kwargs)
|
||||
|
||||
if spec.timeout is not None:
|
||||
return await asyncio.wait_for(loop.run_in_executor(None, fn_call), timeout=spec.timeout)
|
||||
return await loop.run_in_executor(None, fn_call)
|
||||
# inline:直接在事件循环线程调用,无线程池开销,但会阻塞循环。
|
||||
if spec.executor == "inline":
|
||||
result = fn_call()
|
||||
fut: asyncio.Future[Any] = loop.create_future()
|
||||
fut.set_result(result)
|
||||
return fut
|
||||
|
||||
# process:进程池执行,绕过 GIL,适合 CPU 密集型任务(fn 须可 pickle)。
|
||||
if spec.executor == "process":
|
||||
from functools import partial
|
||||
|
||||
pool = _get_process_pool()
|
||||
proc_fn = partial(_run_in_process, spec.effective_fn, args, kwargs)
|
||||
return loop.run_in_executor(pool, proc_fn)
|
||||
|
||||
# thread(默认):线程池执行。
|
||||
return loop.run_in_executor(None, fn_call)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
@@ -662,7 +748,7 @@ def _make_verbose_callback(on_event: EventCallback | None) -> EventCallback:
|
||||
|
||||
def run(
|
||||
graph: Graph,
|
||||
strategy: Strategy = "sequential",
|
||||
strategy: Strategy = "dependency",
|
||||
*,
|
||||
max_workers: int | None = None,
|
||||
dry_run: bool = False,
|
||||
@@ -678,8 +764,8 @@ def run(
|
||||
graph:
|
||||
待执行的已校验 :class:`Graph`。
|
||||
strategy:
|
||||
执行策略: ``"sequential"`` / ``"thread"`` / ``"async"`` /
|
||||
``"dependency"``。``"dependency"`` 为依赖驱动调度,无层屏障。
|
||||
执行策略: ``"dependency"``(默认,依赖驱动无层屏障,最大并行度)/
|
||||
``"sequential"`` / ``"thread"`` / ``"async"``(层屏障模型)。
|
||||
max_workers:
|
||||
``"thread"`` 的线程池大小。默认 ``min(32, len(layer))``。
|
||||
dry_run:
|
||||
@@ -737,6 +823,10 @@ def run(
|
||||
except TaskFailedError:
|
||||
report.success = False
|
||||
raise
|
||||
finally:
|
||||
# 关闭进程池:通知管理线程退出 + kill 工作进程,避免
|
||||
# threading._shutdown 等待管理线程 join 工作进程导致 ~7s 阻塞。
|
||||
_shutdown_process_pool()
|
||||
|
||||
return report
|
||||
|
||||
|
||||
+138
-2
@@ -17,12 +17,13 @@ __all__ = [
|
||||
"GraphDefaults",
|
||||
]
|
||||
|
||||
import inspect
|
||||
import sys
|
||||
from dataclasses import dataclass, field, replace
|
||||
from typing import Any, Callable, Iterable, Mapping, Sequence
|
||||
|
||||
from .errors import CycleError, DuplicateTaskError, MissingDependencyError
|
||||
from .task import RetryPolicy, TaskSpec
|
||||
from .task import Context, RetryPolicy, TaskSpec
|
||||
|
||||
if sys.version_info >= (3, 9): # pragma: no cover
|
||||
import graphlib # pyright: ignore[reportUnreachable]
|
||||
@@ -63,6 +64,74 @@ def _prune_deps(spec: TaskSpec[Any], keep: Callable[[str], bool]) -> TaskSpec[An
|
||||
)
|
||||
|
||||
|
||||
def _make_namespaced_fn(orig_fn: Any, ns: str, dep_names: set[str]) -> Any:
|
||||
"""包装 fn,使其能接收带 ``ns:`` 前缀的依赖名,调用时映射回原参数名。
|
||||
|
||||
命名空间合并后,依赖名带前缀(如 ``build:extract``),但 Python 参数名
|
||||
不能含 ``:``。wrapper 用 ``**kwargs`` 接收所有依赖,内部把带前缀的依赖名
|
||||
映射回原参数名后调用原 fn。
|
||||
|
||||
无依赖参数时直接返回原 fn。
|
||||
"""
|
||||
if not dep_names or orig_fn is None:
|
||||
return orig_fn
|
||||
try:
|
||||
orig_sig = inspect.signature(orig_fn)
|
||||
except (TypeError, ValueError):
|
||||
return orig_fn
|
||||
|
||||
# 带前缀依赖名 -> 原参数名
|
||||
name_map: dict[str, str] = {f"{ns}:{orig}": orig for orig in dep_names}
|
||||
prefix = f"{ns}:"
|
||||
|
||||
# 检查原 fn 是否有 Context 标注参数
|
||||
context_param_name: str | None = None
|
||||
for p in orig_sig.parameters.values():
|
||||
ann = p.annotation
|
||||
if ann is not Context and not (isinstance(ann, str) and ann.endswith("Context")):
|
||||
continue
|
||||
context_param_name = p.name
|
||||
break
|
||||
|
||||
if context_param_name is not None:
|
||||
|
||||
def wrapper(ctx: Any = None, **kwargs: Any) -> Any:
|
||||
# ctx 是 dep_context,键为带前缀的依赖名;映射回原始键
|
||||
orig_ctx: dict[str, Any] = {}
|
||||
for k, v in (ctx or {}).items():
|
||||
orig_ctx[name_map.get(k, k)] = v
|
||||
# kwargs 中带前缀的依赖也映射回原参数名
|
||||
for k, v in kwargs.items():
|
||||
if k in name_map:
|
||||
orig_ctx[name_map[k]] = v
|
||||
return orig_fn(**{context_param_name: orig_ctx})
|
||||
|
||||
ctx_param = inspect.Parameter("ctx", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Context)
|
||||
kw_param = inspect.Parameter("kwargs", inspect.Parameter.VAR_KEYWORD)
|
||||
wrapper.__signature__ = inspect.Signature( # type: ignore[attr-defined]
|
||||
parameters=[ctx_param, kw_param],
|
||||
return_annotation=orig_sig.return_annotation,
|
||||
)
|
||||
else:
|
||||
|
||||
def wrapper(**kwargs: Any) -> Any: # type: ignore[no-redef]
|
||||
orig_kwargs: dict[str, Any] = {}
|
||||
for k, v in kwargs.items():
|
||||
if k.startswith(prefix):
|
||||
orig_kwargs[k[len(prefix) :]] = v
|
||||
return orig_fn(**orig_kwargs)
|
||||
|
||||
kw_param = inspect.Parameter("kwargs", inspect.Parameter.VAR_KEYWORD)
|
||||
wrapper.__signature__ = inspect.Signature( # type: ignore[attr-defined]
|
||||
parameters=[kw_param],
|
||||
return_annotation=orig_sig.return_annotation,
|
||||
)
|
||||
|
||||
wrapper.__name__ = f"{ns}_{getattr(orig_fn, '__name__', 'fn')}"
|
||||
wrapper.__doc__ = getattr(orig_fn, "__doc__", None)
|
||||
return wrapper
|
||||
|
||||
|
||||
@dataclass
|
||||
class Graph:
|
||||
"""校验后的有向无环任务图。
|
||||
@@ -78,6 +147,7 @@ class Graph:
|
||||
specs: dict[str, TaskSpec[Any]] = field(default_factory=dict)
|
||||
deps: dict[str, tuple[str, ...]] = field(default_factory=dict)
|
||||
defaults: GraphDefaults = field(default_factory=GraphDefaults)
|
||||
namespace: str | None = None
|
||||
|
||||
# 待解析的字符串引用列表(由 GraphComposer 消费);为空表示无引用。
|
||||
_pending_refs: list[str] = field(default_factory=list)
|
||||
@@ -95,6 +165,28 @@ class Graph:
|
||||
self._validate_references()
|
||||
return self
|
||||
|
||||
def chain(self, *specs: TaskSpec[Any]) -> Graph:
|
||||
"""链式注册任务:每个 spec 自动依赖前一个。
|
||||
|
||||
``chain(a, b, c)`` 等价于 ``b`` 依赖 ``a``,``c`` 依赖 ``b``。
|
||||
若 spec 已带 ``depends_on``,则前驱名追加到现有依赖前。
|
||||
返回 ``self`` 支持链式调用。
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> graph = px.Graph().chain(extract, transform, load)
|
||||
"""
|
||||
prev_name: str | None = None
|
||||
for s in specs:
|
||||
current = s
|
||||
if prev_name is not None:
|
||||
# 将前驱追加到 depends_on 最前(保持显式依赖优先)
|
||||
new_deps = (prev_name, *s.depends_on) if prev_name not in s.depends_on else s.depends_on
|
||||
current = replace(s, depends_on=new_deps)
|
||||
self.add(current)
|
||||
prev_name = current.name
|
||||
return self
|
||||
|
||||
def _register(self, spec: TaskSpec[Any]) -> None:
|
||||
if spec.name in self.specs:
|
||||
raise DuplicateTaskError(spec.name)
|
||||
@@ -108,6 +200,8 @@ class Graph:
|
||||
cls,
|
||||
specs: Iterable[TaskSpec[Any] | str],
|
||||
defaults: GraphDefaults | None = None,
|
||||
*,
|
||||
namespace: str | None = None,
|
||||
) -> Graph:
|
||||
"""从可迭代的 task spec 构建图。
|
||||
|
||||
@@ -120,8 +214,10 @@ class Graph:
|
||||
TaskSpec 对象或字符串引用的列表。
|
||||
defaults:
|
||||
图级默认值。``None`` 使用空 :class:`GraphDefaults`。
|
||||
namespace:
|
||||
可选命名空间,用于 :meth:`add_subgraph` 合并时加前缀。
|
||||
"""
|
||||
graph = cls(defaults=defaults or GraphDefaults())
|
||||
graph = cls(defaults=defaults or GraphDefaults(), namespace=namespace)
|
||||
pending_refs: list[str] = []
|
||||
|
||||
for spec in specs:
|
||||
@@ -139,6 +235,46 @@ class Graph:
|
||||
graph.validate()
|
||||
return graph
|
||||
|
||||
def add_subgraph(self, sub: Graph, *, namespace: str | None = None) -> Graph:
|
||||
"""将子图合并到当前图,任务名加命名空间前缀避免冲突。
|
||||
|
||||
参数
|
||||
----
|
||||
sub:
|
||||
待合并的子图。
|
||||
namespace:
|
||||
命名空间前缀。``None`` 时使用 ``sub.namespace``,若子图也无命名空间
|
||||
则抛出 ``ValueError``。最终任务名为 ``f"{ns}:{original_name}"``。
|
||||
|
||||
合并后,子图内任务的依赖名也会被加前缀;与子图外部任务的依赖保持原样。
|
||||
|
||||
返回 ``self`` 支持链式调用。
|
||||
"""
|
||||
ns = namespace or sub.namespace
|
||||
if not ns:
|
||||
raise ValueError("add_subgraph 需要 namespace 或子图自带 namespace")
|
||||
|
||||
def _rename(name: str) -> str:
|
||||
# 仅对子图内部任务名加前缀;外部依赖保持原样
|
||||
return f"{ns}:{name}" if name in sub.specs else name
|
||||
|
||||
sub_names = set(sub.specs.keys())
|
||||
for spec in sub.specs.values():
|
||||
# 子图内部依赖名需加前缀,对应的 fn 参数也需包装
|
||||
internal_deps = (set(spec.depends_on) | set(spec.soft_depends_on)) & sub_names
|
||||
new_fn = _make_namespaced_fn(spec.fn, ns, internal_deps) if spec.fn else spec.fn
|
||||
new_spec = replace(
|
||||
spec,
|
||||
name=_rename(spec.name),
|
||||
fn=new_fn,
|
||||
depends_on=tuple(_rename(d) for d in spec.depends_on),
|
||||
soft_depends_on=tuple(_rename(d) for d in spec.soft_depends_on),
|
||||
)
|
||||
self._register(new_spec)
|
||||
self._validate_references()
|
||||
self.validate()
|
||||
return self
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 校验
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
@@ -69,6 +69,22 @@ class RunReport:
|
||||
"""以 FAILED 状态结束的任务名列表。"""
|
||||
return [name for name, r in self.results.items() if r.status == TaskStatus.FAILED]
|
||||
|
||||
def succeeded_tasks(self) -> list[str]:
|
||||
"""以 SUCCESS 状态结束的任务名列表。"""
|
||||
return [name for name, r in self.results.items() if r.status == TaskStatus.SUCCESS]
|
||||
|
||||
def skipped_tasks(self) -> list[str]:
|
||||
"""以 SKIPPED 状态结束的任务名列表。"""
|
||||
return [name for name, r in self.results.items() if r.status == TaskStatus.SKIPPED]
|
||||
|
||||
def tasks_by_status(self, status: TaskStatus) -> list[str]:
|
||||
"""返回指定状态的任务名列表。"""
|
||||
return [name for name, r in self.results.items() if r.status == status]
|
||||
|
||||
def durations(self) -> dict[str, float]:
|
||||
"""任务名 -> 执行时长(秒)。无时长记录的为 0.0。"""
|
||||
return {name: (r.duration or 0.0) for name, r in self.results.items()}
|
||||
|
||||
def describe(self) -> str:
|
||||
"""用于调试的人类可读多行报告。"""
|
||||
lines: list[str] = [f"RunReport(success={self.success})"]
|
||||
|
||||
+95
-36
@@ -72,67 +72,126 @@ def _apply_verbose_to_graph(graph: Graph, verbose: bool) -> Graph:
|
||||
class CliRunner:
|
||||
"""命令行运行器: 根据用户输入执行对应的任务流图.
|
||||
|
||||
将命令名映射到 Graph 实例.
|
||||
通过 ``sys.argv`` 解析用户输入的命令, 执行对应的图.
|
||||
将命令别名映射到 Graph 实例. 通过 ``sys.argv`` 解析用户输入的命令,
|
||||
执行对应的图.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
aliases : dict[str, str | list[str] | Graph]
|
||||
命令别名到任务引用的映射. 每个值可以是:
|
||||
* ``str`` —— 单个任务名 (引用 ``tasks`` 中注册的任务),
|
||||
生成单任务图.
|
||||
* ``list[str]`` —— 任务名列表, 自动 :meth:`Graph.chain` 建立链式依赖,
|
||||
即后一个任务依赖前一个.
|
||||
* :class:`~pyflowx.graph.Graph` —— 直接使用该图 (用于复杂场景, 如
|
||||
自定义 ``conditions``、并行分支等).
|
||||
tasks : list[TaskSpec]
|
||||
扁平注册的任务列表. ``aliases`` 中的字符串引用这些任务名.
|
||||
未被任何 alias 引用的任务不会被执行.
|
||||
strategy : str | Strategy
|
||||
默认执行策略 (``Strategy.SEQUENTIAL`` / ``Strategy.THREAD`` /
|
||||
``Strategy.ASYNC`` 或对应字符串). 可被命令行 ``--strategy`` 覆盖.
|
||||
默认执行策略. 可被命令行 ``--strategy`` 覆盖.
|
||||
description : str
|
||||
CLI 帮助文本.
|
||||
verbose : bool
|
||||
是否显示详细执行过程. ``True`` 时打印任务生命周期和 subprocess 输出.
|
||||
默认 ``True``. 可被命令行 ``--quiet`` 关闭.
|
||||
**graphs : Graph
|
||||
命令名到图的映射. 每个 key 是一个命令名, value 是对应的
|
||||
:class:`~pyflowx.graph.Graph`.
|
||||
是否显示详细执行过程. 默认 ``True``, 可被命令行 ``--quiet`` 关闭.
|
||||
|
||||
Examples
|
||||
--------
|
||||
基本用法::
|
||||
简单场景 (tasks + aliases)::
|
||||
|
||||
runner = px.CliRunner(
|
||||
clean=px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("cargo_clean", cmd=["cargo", "clean"]),
|
||||
]
|
||||
),
|
||||
build=px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("uv_build", cmd=["uv", "build"]),
|
||||
]
|
||||
),
|
||||
tasks=[
|
||||
px.cmd(["uv", "build"]), # name="uv_build"
|
||||
px.cmd(["maturin", "build"], name="maturin_build"),
|
||||
px.cmd(["ruff", "check", "--fix"], name="lint"),
|
||||
],
|
||||
aliases={
|
||||
"b": "uv_build",
|
||||
"ba": ["uv_build", "maturin_build"], # chain: maturin 依赖 uv
|
||||
"lint": "lint",
|
||||
},
|
||||
)
|
||||
runner.run() # 解析 sys.argv
|
||||
runner.run()
|
||||
|
||||
指定策略与描述::
|
||||
复杂场景 (直接用 Graph)::
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy=px.Strategy.THREAD,
|
||||
aliases={
|
||||
"a": px.Graph.from_specs([
|
||||
px.TaskSpec("add", cmd=["git", "add", "."], conditions=(...)),
|
||||
px.TaskSpec("commit", cmd=["git", "commit"], depends_on=("add",)),
|
||||
]),
|
||||
},
|
||||
)
|
||||
runner.run(["test", "--strategy", "sequential"])
|
||||
"""
|
||||
|
||||
graphs: dict[str, Graph] = field(default_factory=dict)
|
||||
strategy: Strategy = field(default="sequential")
|
||||
aliases: dict[str, str | list[str | TaskSpec[Any]] | TaskSpec[Any] | Graph] = field(default_factory=dict)
|
||||
tasks: list[TaskSpec[Any]] = field(default_factory=list)
|
||||
strategy: Strategy = field(default="dependency")
|
||||
description: str = field(default_factory=str)
|
||||
verbose: bool = field(default_factory=lambda: True)
|
||||
# 解析后的命令→图映射,__post_init__ 填充
|
||||
graphs: dict[str, Graph] = field(default_factory=dict, init=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.graphs:
|
||||
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
|
||||
if not self.aliases:
|
||||
raise ValueError("CliRunner 至少需要一个别名 (通过 aliases= 提供)")
|
||||
|
||||
# 解析并展开字符串引用,委托给 GraphComposer。
|
||||
# Graph 不再 frozen,可直接赋值,无需 object.__setattr__。
|
||||
self.graphs = GraphComposer(self.graphs).resolve_all()
|
||||
# 1. 把 tasks 注册为虚拟命令图(每个 task 一个图),加入 raw_graphs
|
||||
# 使 GraphComposer 能解析对它们的字符串引用
|
||||
raw_graphs: dict[str, Graph] = {}
|
||||
for spec in self.tasks:
|
||||
if spec.name in raw_graphs:
|
||||
raise ValueError(f"任务名重复: {spec.name!r}")
|
||||
raw_graphs[spec.name] = Graph.from_specs([spec])
|
||||
|
||||
# 2. 把每个 alias 转为 Graph(alias 名可与 task 名相同,覆盖 task 注册)
|
||||
for alias, value in self.aliases.items():
|
||||
raw_graphs[alias] = self._alias_to_graph(alias, value)
|
||||
|
||||
# 3. 解析图间字符串引用(str / list[str] 引用其他 alias 或任务)
|
||||
self.graphs = GraphComposer(raw_graphs).resolve_all()
|
||||
|
||||
@staticmethod
|
||||
def _alias_to_graph(
|
||||
alias: str,
|
||||
value: str | list[str | TaskSpec[Any]] | TaskSpec[Any] | Graph,
|
||||
) -> Graph:
|
||||
"""把 alias 的值转换为 Graph.
|
||||
|
||||
* ``str`` —— 对其他 alias 或已注册任务名的引用, 由 GraphComposer 展开.
|
||||
* ``TaskSpec`` —— 单个内联任务, 生成单任务图.
|
||||
* ``list[str | TaskSpec]`` —— 引用/任务混合列表, GraphComposer 展开时
|
||||
自动让后续引用依赖前面 (chain 语义). 元素为 alias 名、任务名或
|
||||
:class:`TaskSpec` 对象 (内联任务).
|
||||
* ``Graph`` —— 原样返回 (用于复杂场景: conditions、并行分支等).
|
||||
"""
|
||||
if isinstance(value, Graph):
|
||||
return value
|
||||
if isinstance(value, TaskSpec):
|
||||
return Graph.from_specs([value])
|
||||
if isinstance(value, str):
|
||||
# 字符串引用,用 _pending_refs 占位,GraphComposer 后续展开
|
||||
return Graph.from_specs([value]) # type: ignore[arg-type]
|
||||
if isinstance(value, list):
|
||||
if not value:
|
||||
raise ValueError(f"别名 {alias!r} 的任务列表为空")
|
||||
for item in value:
|
||||
if not isinstance(item, (str, TaskSpec)):
|
||||
raise TypeError(f"别名 {alias!r} 的列表元素类型无效: {type(item).__name__}, 预期 str 或 TaskSpec")
|
||||
# str/TaskSpec 混合列表,由 GraphComposer 展开(自动建立 chain 依赖)
|
||||
return Graph.from_specs(value)
|
||||
raise TypeError(
|
||||
f"别名 {alias!r} 的值类型无效: {type(value).__name__}, 预期 str/TaskSpec/list[str|TaskSpec]/Graph"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 内省
|
||||
# ------------------------------------------------------------------ #
|
||||
@property
|
||||
def commands(self) -> list[str]:
|
||||
"""可用的命令列表 (按插入顺序)."""
|
||||
return list(self.graphs.keys())
|
||||
"""可用的命令列表 (按 aliases 定义顺序, 不含 tasks 中未引用的任务)."""
|
||||
return list(self.aliases.keys())
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 参数解析
|
||||
@@ -225,9 +284,9 @@ class CliRunner:
|
||||
parser.print_help()
|
||||
return CliExitCode.FAILURE.value
|
||||
|
||||
# 验证命令
|
||||
if parsed.command not in self.graphs:
|
||||
available = ", ".join(self.graphs.keys())
|
||||
# 验证命令(必须是已注册的 alias,不接受裸任务名)
|
||||
if parsed.command not in self.aliases:
|
||||
available = ", ".join(self.commands)
|
||||
print(
|
||||
f"错误: 未知命令 {parsed.command!r} (可用命令: {available})",
|
||||
file=sys.stderr,
|
||||
|
||||
@@ -254,6 +254,10 @@ class TaskSpec(Generic[T]):
|
||||
存取状态后端,使不同输入产生独立缓存条目。``None`` 表示用任务名。
|
||||
hooks:
|
||||
:class:`TaskHooks` 生命周期钩子。
|
||||
executor:
|
||||
同步任务的执行器:``"thread"``(默认,线程池)/ ``"process"``
|
||||
(进程池,绕过 GIL,适合 CPU 密集型;``fn`` 须可 pickle)/
|
||||
``"inline"``(直接在事件循环线程调用,最快但会阻塞循环)。
|
||||
"""
|
||||
|
||||
name: str
|
||||
@@ -279,6 +283,7 @@ class TaskSpec(Generic[T]):
|
||||
continue_on_error: bool = False
|
||||
cache_key: CacheKeyFn | None = None
|
||||
hooks: TaskHooks = field(default_factory=TaskHooks)
|
||||
executor: str = "thread" # "thread" | "process" | "inline"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.name:
|
||||
@@ -447,6 +452,121 @@ def _env_and_cwd(
|
||||
# ---------------------------------------------------------------------- #
|
||||
# 任务模板:批量生成相似 TaskSpec 的工厂
|
||||
# ---------------------------------------------------------------------- #
|
||||
def _task_noop() -> None:
|
||||
"""task(cmd=...) 形式下的占位 fn(cmd 任务执行期不调用 fn)。"""
|
||||
return None
|
||||
|
||||
|
||||
def task(
|
||||
fn: TaskFn[Any] | None = None,
|
||||
*,
|
||||
cmd: TaskCmd | None = None,
|
||||
depends_on: tuple[str, ...] = (),
|
||||
soft_depends_on: tuple[str, ...] = (),
|
||||
defaults: Mapping[str, Any] | None = None,
|
||||
args: tuple[Any, ...] = (),
|
||||
kwargs: Mapping[str, Any] | None = None,
|
||||
retry: RetryPolicy | None = None,
|
||||
timeout: float | None = None,
|
||||
tags: tuple[str, ...] = (),
|
||||
conditions: tuple[Condition, ...] = (),
|
||||
cwd: str | Path | None = None,
|
||||
env: Mapping[str, str] | None = None,
|
||||
verbose: bool = False,
|
||||
skip_if_missing: bool = False,
|
||||
allow_upstream_skip: bool = False,
|
||||
strategy: str | None = None,
|
||||
priority: int = 0,
|
||||
concurrency_key: str | None = None,
|
||||
continue_on_error: bool = False,
|
||||
cache_key: CacheKeyFn | None = None,
|
||||
hooks: TaskHooks | None = None,
|
||||
name: str | None = None,
|
||||
) -> Any:
|
||||
"""装饰器:将函数转为 :class:`TaskSpec`。
|
||||
|
||||
``name`` 默认取 ``fn.__name__``。可直接装饰函数,或带参数使用。
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @px.task
|
||||
... def extract(): return [1, 2, 3]
|
||||
>>> @px.task(depends_on=("extract",))
|
||||
... def double(extract): return [x * 2 for x in extract]
|
||||
>>> graph = px.Graph.from_specs([extract, double])
|
||||
"""
|
||||
|
||||
def _decorate(func: TaskFn[Any]) -> TaskSpec[Any]:
|
||||
spec_name = name or func.__name__
|
||||
return TaskSpec(
|
||||
name=spec_name,
|
||||
fn=func,
|
||||
cmd=cmd,
|
||||
depends_on=depends_on,
|
||||
soft_depends_on=soft_depends_on,
|
||||
defaults=dict(defaults) if defaults else {},
|
||||
args=args,
|
||||
kwargs=dict(kwargs) if kwargs else {},
|
||||
retry=retry if retry is not None else RetryPolicy(),
|
||||
timeout=timeout,
|
||||
tags=tags,
|
||||
conditions=conditions,
|
||||
cwd=Path(cwd) if isinstance(cwd, str) else cwd,
|
||||
env=dict(env) if env else None,
|
||||
verbose=verbose,
|
||||
skip_if_missing=skip_if_missing,
|
||||
allow_upstream_skip=allow_upstream_skip,
|
||||
strategy=strategy,
|
||||
priority=priority,
|
||||
concurrency_key=concurrency_key,
|
||||
continue_on_error=continue_on_error,
|
||||
cache_key=cache_key,
|
||||
hooks=hooks if hooks is not None else TaskHooks(),
|
||||
)
|
||||
|
||||
if fn is None and cmd is None:
|
||||
# 带参数调用:@task(depends_on=...),等待被装饰函数
|
||||
return _decorate
|
||||
if fn is None:
|
||||
# task(cmd=..., name=...) 直接构造,无被装饰函数
|
||||
if name is None:
|
||||
raise ValueError("task(cmd=...) 需要显式提供 name")
|
||||
return _decorate(_task_noop)
|
||||
return _decorate(fn)
|
||||
|
||||
|
||||
def cmd(
|
||||
command: list[str],
|
||||
*,
|
||||
name: str | None = None,
|
||||
depends_on: tuple[str, ...] = (),
|
||||
**kwargs: Any,
|
||||
) -> TaskSpec[Any]:
|
||||
"""从命令列表快速创建 :class:`TaskSpec`。
|
||||
|
||||
``name`` 默认为 ``"_".join(command[:2])``(如 ``["uv", "build"]`` → ``"uv_build"``)。
|
||||
若命令不足两个元素则用 ``"_".join(command)``。
|
||||
|
||||
其余关键字参数透传给 :class:`TaskSpec`(如 ``depends_on``、``tags`` 等)。
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> uv_build = px.cmd(["uv", "build"])
|
||||
>>> uv_build.name
|
||||
'uv_build'
|
||||
>>> lint = px.cmd(["ruff", "check", "--fix"], name="lint")
|
||||
>>> lint.name
|
||||
'lint'
|
||||
"""
|
||||
spec_name = name or "_".join(command[:2]) if len(command) >= 2 else "_".join(command)
|
||||
return TaskSpec(
|
||||
name=spec_name,
|
||||
cmd=command,
|
||||
depends_on=depends_on,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
def task_template(
|
||||
fn: TaskFn[Any] | None = None,
|
||||
cmd: TaskCmd | None = None,
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
"""进程池测试辅助:模块级函数(须可 pickle)。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
|
||||
def cpu_heavy(n: int) -> int:
|
||||
"""CPU 密集型计算(求平方和)。"""
|
||||
return sum(i * i for i in range(n))
|
||||
|
||||
|
||||
def add(a: int, b: int) -> int:
|
||||
"""简单加法。"""
|
||||
return a + b
|
||||
|
||||
|
||||
def sub(a: int, b: int) -> int:
|
||||
"""简单减法。"""
|
||||
return a - b
|
||||
|
||||
|
||||
def slow_sleep(seconds: float) -> int:
|
||||
"""睡眠指定秒数,用于测试超时。"""
|
||||
time.sleep(seconds)
|
||||
return int(seconds)
|
||||
+74
-108
@@ -7,155 +7,121 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from pyflowx.cli import pymake
|
||||
from pyflowx.conditions import Constants
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
# maturin_build_cmd
|
||||
# ---------------------------------------------------------------------- #
|
||||
class TestMaturinBuildCmd:
|
||||
"""Test maturin_build_cmd function."""
|
||||
|
||||
def test_returns_list(self) -> None:
|
||||
"""Should return a list."""
|
||||
cmd = pymake.maturin_build_cmd()
|
||||
assert isinstance(cmd, list)
|
||||
|
||||
def test_contains_maturin_build(self) -> None:
|
||||
"""Should contain 'maturin' and 'build'."""
|
||||
cmd = pymake.maturin_build_cmd()
|
||||
assert "maturin" in cmd
|
||||
assert "build" in cmd
|
||||
|
||||
def test_contains_release_flag(self) -> None:
|
||||
"""Should contain release flag '-r'."""
|
||||
cmd = pymake.maturin_build_cmd()
|
||||
assert "-r" in cmd
|
||||
|
||||
def test_windows_includes_target(self) -> None:
|
||||
"""On Windows, should include target-specific flags."""
|
||||
cmd = pymake.maturin_build_cmd()
|
||||
if Constants.IS_WINDOWS:
|
||||
assert "--target" in cmd
|
||||
assert "x86_64-win7-windows-msvc" in cmd
|
||||
assert "-Zbuild-std" in cmd
|
||||
assert "-i" in cmd
|
||||
assert "python3.8" in cmd
|
||||
else:
|
||||
# On non-Windows, should not include Windows-specific flags
|
||||
assert "--target" not in cmd
|
||||
|
||||
def test_does_not_mutate_on_multiple_calls(self) -> None:
|
||||
"""Multiple calls should return independent lists."""
|
||||
cmd1 = pymake.maturin_build_cmd()
|
||||
cmd2 = pymake.maturin_build_cmd()
|
||||
assert cmd1 == cmd2
|
||||
# Mutating one should not affect the other
|
||||
cmd1.append("extra")
|
||||
assert "extra" not in cmd2
|
||||
|
||||
def test_non_windows_excludes_target_flags(self) -> None:
|
||||
"""On non-Windows, should not include Windows-specific flags (覆盖 22->32 分支)."""
|
||||
from unittest.mock import patch
|
||||
|
||||
with patch.object(pymake.Constants, "IS_WINDOWS", False):
|
||||
cmd = pymake.maturin_build_cmd()
|
||||
assert "maturin" in cmd
|
||||
assert "build" in cmd
|
||||
assert "-r" in cmd
|
||||
assert "--target" not in cmd
|
||||
assert "-Zbuild-std" not in cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
# TaskSpec definitions
|
||||
# ---------------------------------------------------------------------- #
|
||||
def _find_task(name: str) -> pymake.px.TaskSpec:
|
||||
"""从 pymake.tasks 或 aliases 中查找指定名称的 TaskSpec."""
|
||||
for spec in pymake.tasks:
|
||||
if spec.name == name:
|
||||
return spec
|
||||
# 单任务别名(doc/lint/tox)内联在 aliases dict 中
|
||||
value = pymake.aliases.get(name)
|
||||
if isinstance(value, pymake.px.TaskSpec):
|
||||
return value
|
||||
raise KeyError(f"任务 {name!r} 未找到")
|
||||
|
||||
|
||||
class TestTaskSpecDefinitions:
|
||||
"""Test that all TaskSpec definitions are valid."""
|
||||
|
||||
def test_uv_build_spec(self) -> None:
|
||||
"""uv_build spec should be properly defined."""
|
||||
assert pymake.uv_build.name == "uv_build"
|
||||
assert pymake.uv_build.cmd == ["uv", "build"]
|
||||
assert pymake.uv_build.skip_if_missing is False
|
||||
spec = _find_task("uv_build")
|
||||
assert spec.name == "uv_build"
|
||||
assert spec.cmd == ["uv", "build"]
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_maturin_build_spec(self) -> None:
|
||||
"""maturin_build spec should be properly defined."""
|
||||
assert pymake.maturin_build.name == "maturin_build"
|
||||
assert isinstance(pymake.maturin_build.cmd, list)
|
||||
assert pymake.maturin_build.skip_if_missing is False
|
||||
spec = _find_task("maturin_build")
|
||||
assert spec.name == "maturin_build"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_uv_sync_spec(self) -> None:
|
||||
"""uv_sync spec should be properly defined."""
|
||||
assert pymake.uv_sync.name == "uv_sync"
|
||||
assert pymake.uv_sync.cmd == ["uv", "sync"]
|
||||
assert pymake.uv_sync.skip_if_missing is False
|
||||
spec = _find_task("uv_sync")
|
||||
assert spec.name == "uv_sync"
|
||||
assert spec.cmd == ["uv", "sync"]
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_git_clean_spec(self) -> None:
|
||||
"""git_clean spec should be properly defined."""
|
||||
assert pymake.git_clean.name == "git_clean"
|
||||
assert pymake.git_clean.cmd == ["gitt", "c"]
|
||||
assert pymake.git_clean.skip_if_missing is False
|
||||
spec = _find_task("git_clean")
|
||||
assert spec.name == "git_clean"
|
||||
assert spec.cmd == ["gitt", "c"]
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_test_spec(self) -> None:
|
||||
"""test spec should be properly defined."""
|
||||
assert pymake.test.name == "test"
|
||||
assert isinstance(pymake.test.cmd, list)
|
||||
assert "pytest" in pymake.test.cmd
|
||||
assert "-m" in pymake.test.cmd
|
||||
assert "not slow" in pymake.test.cmd
|
||||
assert pymake.test.skip_if_missing is False
|
||||
spec = _find_task("test")
|
||||
assert spec.name == "test"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "pytest" in spec.cmd
|
||||
assert "-m" in spec.cmd
|
||||
assert "not slow" in spec.cmd
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_test_fast_spec(self) -> None:
|
||||
"""test_fast spec should be properly defined."""
|
||||
assert pymake.test_fast.name == "test_fast"
|
||||
assert isinstance(pymake.test_fast.cmd, list)
|
||||
assert "pytest" in pymake.test_fast.cmd
|
||||
assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel
|
||||
assert pymake.test_fast.skip_if_missing is False
|
||||
spec = _find_task("test_fast")
|
||||
assert spec.name == "test_fast"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "pytest" in spec.cmd
|
||||
assert "-n" not in spec.cmd # test_fast doesn't use parallel
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_test_coverage_spec(self) -> None:
|
||||
"""test_coverage spec should be properly defined."""
|
||||
assert pymake.test_coverage.name == "test_coverage"
|
||||
assert isinstance(pymake.test_coverage.cmd, list)
|
||||
assert "pytest" in pymake.test_coverage.cmd
|
||||
assert "--cov" in pymake.test_coverage.cmd
|
||||
assert pymake.test_coverage.skip_if_missing is False
|
||||
spec = _find_task("test_coverage")
|
||||
assert spec.name == "test_coverage"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "pytest" in spec.cmd
|
||||
assert "--cov" in spec.cmd
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_ruff_lint_spec(self) -> None:
|
||||
"""ruff_lint spec should be properly defined."""
|
||||
assert pymake.ruff_lint.name == "lint"
|
||||
assert isinstance(pymake.ruff_lint.cmd, list)
|
||||
assert "ruff" in pymake.ruff_lint.cmd
|
||||
assert "check" in pymake.ruff_lint.cmd
|
||||
assert pymake.ruff_lint.skip_if_missing is False
|
||||
"""lint spec should be properly defined."""
|
||||
spec = _find_task("lint")
|
||||
assert spec.name == "lint"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "ruff" in spec.cmd
|
||||
assert "check" in spec.cmd
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_doc_spec(self) -> None:
|
||||
"""doc spec should be properly defined."""
|
||||
assert pymake.doc.name == "doc"
|
||||
assert isinstance(pymake.doc.cmd, list)
|
||||
assert "sphinx-build" in pymake.doc.cmd
|
||||
assert pymake.doc.skip_if_missing is False
|
||||
spec = _find_task("doc")
|
||||
assert spec.name == "doc"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "sphinx-build" in spec.cmd
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_hatch_publish_spec(self) -> None:
|
||||
"""hatch_publish spec should be properly defined."""
|
||||
assert pymake.hatch_publish.name == "publish_python"
|
||||
assert pymake.hatch_publish.cmd == ["hatch", "publish"]
|
||||
assert pymake.hatch_publish.skip_if_missing is False
|
||||
"""publish_python spec should be properly defined."""
|
||||
spec = _find_task("publish_python")
|
||||
assert spec.name == "publish_python"
|
||||
assert spec.cmd == ["hatch", "publish"]
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_twine_publish_spec(self) -> None:
|
||||
"""twine_publish spec should be properly defined."""
|
||||
assert pymake.twine_publish.name == "twine_publish"
|
||||
assert isinstance(pymake.twine_publish.cmd, list)
|
||||
assert "twine" in pymake.twine_publish.cmd
|
||||
assert "upload" in pymake.twine_publish.cmd
|
||||
assert pymake.twine_publish.skip_if_missing is False
|
||||
spec = _find_task("twine_publish")
|
||||
assert spec.name == "twine_publish"
|
||||
assert isinstance(spec.cmd, list)
|
||||
assert "twine" in spec.cmd
|
||||
assert "upload" in spec.cmd
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
def test_tox_spec(self) -> None:
|
||||
"""tox spec should be properly defined."""
|
||||
assert pymake.tox.name == "tox"
|
||||
assert pymake.tox.cmd == ["tox", "-p", "auto"]
|
||||
assert pymake.tox.skip_if_missing is False
|
||||
spec = _find_task("tox")
|
||||
assert spec.name == "tox"
|
||||
assert spec.cmd == ["tox", "-p", "auto"]
|
||||
assert spec.skip_if_missing is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# 将 tests 目录加入 sys.path,使进程池测试能 import _proc_helper 模块级辅助函数。
|
||||
# 进程池 pickle 要求被调用函数为模块级,conftest.py 在 xdist worker 中也会执行。
|
||||
_TESTS_DIR = str(Path(__file__).resolve().parent)
|
||||
if _TESTS_DIR not in sys.path:
|
||||
sys.path.insert(0, _TESTS_DIR)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def packtool_tmp_workdir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
"""Tests for Graph.chain DSL."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pyflowx as px
|
||||
from pyflowx.task import TaskSpec
|
||||
|
||||
|
||||
def _fn() -> None:
|
||||
return None
|
||||
|
||||
|
||||
def test_chain_basic_linkage() -> None:
|
||||
"""chain(a, b, c) 应建立 a->b->c 依赖."""
|
||||
a = TaskSpec("a", _fn)
|
||||
b = TaskSpec("b", _fn)
|
||||
c = TaskSpec("c", _fn)
|
||||
|
||||
graph = px.Graph().chain(a, b, c)
|
||||
|
||||
assert graph.all_specs()["b"].depends_on == ("a",)
|
||||
assert graph.all_specs()["c"].depends_on == ("b",)
|
||||
assert graph.all_specs()["a"].depends_on == ()
|
||||
|
||||
|
||||
def test_chain_single_spec() -> None:
|
||||
"""chain(a) 应只注册 a,无依赖."""
|
||||
a = TaskSpec("a", _fn)
|
||||
graph = px.Graph().chain(a)
|
||||
assert "a" in graph
|
||||
assert graph.all_specs()["a"].depends_on == ()
|
||||
|
||||
|
||||
def test_chain_preserves_existing_deps() -> None:
|
||||
"""chain 应保留 spec 已有的 depends_on."""
|
||||
a = TaskSpec("a", _fn)
|
||||
b = TaskSpec("b", _fn)
|
||||
c = TaskSpec("c", _fn, depends_on=("b",))
|
||||
|
||||
graph = px.Graph().chain(a, b, c)
|
||||
# c 已有 depends_on=('b',),前驱是 b,已在依赖中,不重复添加
|
||||
assert graph.all_specs()["c"].depends_on == ("b",)
|
||||
|
||||
|
||||
def test_chain_merges_existing_deps() -> None:
|
||||
"""chain 应将前驱追加到已有依赖前(若不存在)."""
|
||||
a = TaskSpec("a", _fn)
|
||||
x = TaskSpec("x", _fn)
|
||||
c = TaskSpec("c", _fn, depends_on=("x",))
|
||||
|
||||
graph = px.Graph().chain(a, x, c)
|
||||
# c 前驱是 x,但 c 已依赖 x,不重复
|
||||
assert graph.all_specs()["c"].depends_on == ("x",)
|
||||
|
||||
|
||||
def test_chain_returns_self() -> None:
|
||||
"""chain 返回 self 支持链式调用."""
|
||||
a = TaskSpec("a", _fn)
|
||||
graph = px.Graph()
|
||||
assert graph.chain(a) is graph
|
||||
|
||||
|
||||
def test_chain_execution_order() -> None:
|
||||
"""chain 应保证执行顺序."""
|
||||
order: list[str] = []
|
||||
|
||||
def make(name: str):
|
||||
def fn() -> str:
|
||||
order.append(name)
|
||||
return name
|
||||
return fn
|
||||
|
||||
a = TaskSpec("a", make("a"))
|
||||
b = TaskSpec("b", make("b"))
|
||||
c = TaskSpec("c", make("c"))
|
||||
|
||||
graph = px.Graph().chain(a, b, c)
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert order == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_chain_with_decorator_specs() -> None:
|
||||
"""chain 应与 @task 装饰器配合."""
|
||||
|
||||
@px.task
|
||||
def extract() -> int:
|
||||
return 1
|
||||
|
||||
@px.task
|
||||
def transform(extract: int) -> int:
|
||||
return extract + 10
|
||||
|
||||
@px.task
|
||||
def load(transform: int) -> int:
|
||||
return transform + 100
|
||||
|
||||
graph = px.Graph().chain(extract, transform, load)
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["load"] == 111
|
||||
+19
-19
@@ -17,7 +17,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"build": px.Graph.from_specs([build_task]),
|
||||
"test": px.Graph.from_specs([test_task]),
|
||||
"all": px.Graph.from_specs([build_task, "test"]),
|
||||
@@ -38,7 +38,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs([task2]),
|
||||
"cmd3": px.Graph.from_specs([task3]),
|
||||
@@ -57,7 +57,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"lint": px.Graph.from_specs([lint_task, format_task]),
|
||||
"quick": px.Graph.from_specs(["lint.lint"]),
|
||||
},
|
||||
@@ -75,7 +75,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs(["cmd1", task2]),
|
||||
"cmd3": px.Graph.from_specs(["cmd2", task3]),
|
||||
@@ -93,7 +93,7 @@ class TestCommandReferences:
|
||||
with pytest.raises(ValueError, match="循环引用"):
|
||||
px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs(["cmd1", task1]),
|
||||
},
|
||||
)
|
||||
@@ -105,7 +105,7 @@ class TestCommandReferences:
|
||||
with pytest.raises(ValueError, match="引用的命令 'invalid' 不存在"):
|
||||
px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs(["invalid", task1]),
|
||||
},
|
||||
)
|
||||
@@ -117,7 +117,7 @@ class TestCommandReferences:
|
||||
with pytest.raises(ValueError, match="任务 'invalid' 不存在于命令 'cmd1' 中"):
|
||||
px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs(["cmd1.invalid"]),
|
||||
},
|
||||
@@ -130,7 +130,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1, task2]),
|
||||
"cmd2": px.Graph.from_specs(["cmd1"]),
|
||||
},
|
||||
@@ -148,7 +148,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1, task2]),
|
||||
"cmd2": px.Graph.from_specs(["cmd1", task3]),
|
||||
},
|
||||
@@ -168,7 +168,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs([task2, task3]),
|
||||
"cmd3": px.Graph.from_specs([task4]),
|
||||
@@ -205,7 +205,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs([task2]),
|
||||
"all": px.Graph.from_specs(["cmd1", "cmd2", task3, task4, task5]),
|
||||
@@ -242,7 +242,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1, task2]),
|
||||
"cmd2": px.Graph.from_specs([task3]),
|
||||
"all": px.Graph.from_specs(["cmd1", "cmd2", task4]),
|
||||
@@ -279,7 +279,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"c": px.Graph.from_specs([git_clean]),
|
||||
"tc": px.Graph.from_specs([typecheck, "lint"]),
|
||||
"lint": px.Graph.from_specs([lint, format_task]),
|
||||
@@ -319,7 +319,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs([task2]),
|
||||
"cmd3": px.Graph.from_specs([task3]),
|
||||
@@ -350,7 +350,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"all": px.Graph.from_specs([task1, task2, task3]),
|
||||
},
|
||||
)
|
||||
@@ -373,7 +373,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1, task2]),
|
||||
"all": px.Graph.from_specs(["cmd1"]),
|
||||
},
|
||||
@@ -399,7 +399,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1]),
|
||||
"cmd2": px.Graph.from_specs(["cmd1", task2]),
|
||||
"cmd3": px.Graph.from_specs(["cmd2", task3]),
|
||||
@@ -430,7 +430,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"cmd1": px.Graph.from_specs([task1, task2]), # Parallel tasks
|
||||
"cmd2": px.Graph.from_specs([task3, task4]), # Parallel tasks
|
||||
"all": px.Graph.from_specs(["cmd1", "cmd2"]),
|
||||
@@ -465,7 +465,7 @@ class TestCommandReferences:
|
||||
|
||||
runner = px.CliRunner(
|
||||
strategy="sequential",
|
||||
graphs={
|
||||
aliases={
|
||||
"clean": px.Graph.from_specs([clean]),
|
||||
"build": px.Graph.from_specs([build1, build2]),
|
||||
"test": px.Graph.from_specs([test1, test2]),
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
"""Tests for process executor (spec.executor='process')."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
# pyrefly: ignore[missing-import]
|
||||
from _proc_helper import add, cpu_heavy, slow_sleep, sub
|
||||
|
||||
import pyflowx as px
|
||||
from pyflowx.errors import TaskFailedError
|
||||
|
||||
|
||||
def test_process_executor_runs_cpu_task() -> None:
|
||||
"""executor='process' 应在进程池中执行 CPU 密集型任务."""
|
||||
spec = px.TaskSpec("cpu", fn=cpu_heavy, args=(1000,), executor="process")
|
||||
graph = px.Graph.from_specs([spec])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["cpu"] == sum(i * i for i in range(1000))
|
||||
|
||||
|
||||
def test_process_executor_with_dependency() -> None:
|
||||
"""进程池任务应支持依赖注入."""
|
||||
spec1 = px.TaskSpec("a", fn=cpu_heavy, args=(100,), executor="process")
|
||||
spec2 = px.TaskSpec("b", fn=add, args=(3, 4), executor="process", depends_on=("a",))
|
||||
graph = px.Graph.from_specs([spec1, spec2])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["b"] == 7
|
||||
|
||||
|
||||
def test_process_executor_default_is_thread() -> None:
|
||||
"""TaskSpec.executor 默认应为 'thread'."""
|
||||
spec = px.TaskSpec("x", fn=lambda: None)
|
||||
assert spec.executor == "thread"
|
||||
|
||||
|
||||
def test_inline_executor_runs_in_event_loop() -> None:
|
||||
"""executor='inline' 应直接在事件循环线程调用."""
|
||||
spec = px.TaskSpec("inline", fn=add, args=(10, 20), executor="inline")
|
||||
graph = px.Graph.from_specs([spec])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["inline"] == 30
|
||||
|
||||
|
||||
def test_process_executor_with_kwargs() -> None:
|
||||
"""进程池任务应支持 kwargs 注入."""
|
||||
spec = px.TaskSpec("kw", fn=sub, args=(10,), kwargs={"b": 3}, executor="process")
|
||||
graph = px.Graph.from_specs([spec])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["kw"] == 7
|
||||
|
||||
|
||||
def test_process_executor_timeout() -> None:
|
||||
"""进程池任务超时应抛 TaskFailedError."""
|
||||
spec = px.TaskSpec("slow", fn=slow_sleep, args=(10.0,), executor="process", timeout=0.1)
|
||||
graph = px.Graph.from_specs([spec])
|
||||
with pytest.raises(TaskFailedError):
|
||||
px.run(graph)
|
||||
@@ -0,0 +1,152 @@
|
||||
"""Tests for Graph namespace and add_subgraph."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
import pyflowx as px
|
||||
|
||||
|
||||
def _fn() -> None:
|
||||
return None
|
||||
|
||||
|
||||
def test_graph_namespace_field_default_none() -> None:
|
||||
"""Graph 默认 namespace 为 None."""
|
||||
graph = px.Graph()
|
||||
assert graph.namespace is None
|
||||
|
||||
|
||||
def test_graph_from_specs_with_namespace() -> None:
|
||||
"""from_specs(namespace=...) 应设置 graph.namespace."""
|
||||
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)], namespace="ns1")
|
||||
assert graph.namespace == "ns1"
|
||||
|
||||
|
||||
def test_add_subgraph_prefixes_task_names() -> None:
|
||||
"""add_subgraph 应给子图任务名加命名空间前缀."""
|
||||
sub = px.Graph.from_specs(
|
||||
[px.TaskSpec("extract", _fn), px.TaskSpec("build", _fn, depends_on=("extract",))],
|
||||
namespace="build",
|
||||
)
|
||||
main = px.Graph.from_specs([px.TaskSpec("start", _fn)])
|
||||
main.add_subgraph(sub)
|
||||
|
||||
assert "start" in main
|
||||
assert "build:extract" in main
|
||||
assert "build:build" in main
|
||||
|
||||
|
||||
def test_add_subgraph_renames_internal_deps() -> None:
|
||||
"""add_subgraph 应给子图内部依赖名加前缀."""
|
||||
sub = px.Graph.from_specs(
|
||||
[px.TaskSpec("a", _fn), px.TaskSpec("b", _fn, depends_on=("a",))],
|
||||
namespace="ns",
|
||||
)
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub)
|
||||
|
||||
b_spec = main.all_specs()["ns:b"]
|
||||
assert b_spec.depends_on == ("ns:a",)
|
||||
|
||||
|
||||
def test_add_subgraph_all_internal_deps_prefixed() -> None:
|
||||
"""add_subgraph 子图内所有任务(含被依赖的)都加前缀."""
|
||||
sub = px.Graph.from_specs(
|
||||
[px.TaskSpec("ext", _fn), px.TaskSpec("b", _fn, depends_on=("ext",))],
|
||||
namespace="ns",
|
||||
)
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub)
|
||||
|
||||
b_spec = main.all_specs()["ns:b"]
|
||||
assert b_spec.depends_on == ("ns:ext",)
|
||||
assert "ns:ext" in main
|
||||
|
||||
|
||||
def test_add_subgraph_requires_namespace() -> None:
|
||||
"""add_subgraph 无 namespace 时应抛 ValueError."""
|
||||
sub = px.Graph.from_specs([px.TaskSpec("a", _fn)]) # 无 namespace
|
||||
main = px.Graph()
|
||||
with pytest.raises(ValueError, match="namespace"):
|
||||
main.add_subgraph(sub)
|
||||
|
||||
|
||||
def test_add_subgraph_explicit_namespace_overrides() -> None:
|
||||
"""add_subgraph(namespace=...) 应覆盖子图自带 namespace."""
|
||||
sub = px.Graph.from_specs([px.TaskSpec("a", _fn)], namespace="original")
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub, namespace="override")
|
||||
|
||||
assert "override:a" in main
|
||||
assert "original:a" not in main
|
||||
|
||||
|
||||
def test_add_subgraph_internal_injection_works() -> None:
|
||||
"""子图内部依赖注入应通过 wrapper 正常工作."""
|
||||
sub = px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("extract", lambda: [1, 2, 3]),
|
||||
px.TaskSpec("build", lambda extract: [x * 2 for x in extract], depends_on=("extract",)),
|
||||
],
|
||||
namespace="build",
|
||||
)
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub)
|
||||
|
||||
report = px.run(main)
|
||||
assert report.success
|
||||
assert report["build:build"] == [2, 4, 6]
|
||||
|
||||
|
||||
def test_add_subgraph_cross_namespace_ref_via_context() -> None:
|
||||
"""跨命名空间引用应通过 Context 标注接收."""
|
||||
|
||||
def consumer(ctx: px.Context) -> str:
|
||||
return f"got {ctx['ns:data']}"
|
||||
|
||||
sub = px.Graph.from_specs(
|
||||
[px.TaskSpec("data", lambda: "data_value")],
|
||||
namespace="ns",
|
||||
)
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub)
|
||||
|
||||
main.add(px.TaskSpec("consumer", consumer, depends_on=("ns:data",)))
|
||||
|
||||
report = px.run(main)
|
||||
assert report.success
|
||||
assert report["consumer"] == "got data_value"
|
||||
|
||||
|
||||
def test_add_subgraph_context_annotation_in_subgraph() -> None:
|
||||
"""子图内部任务用 Context 标注时,wrapper 应正确传递."""
|
||||
|
||||
def sink(ctx: px.Context) -> int:
|
||||
return ctx["src"]
|
||||
|
||||
sub = px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("src", lambda: 42),
|
||||
px.TaskSpec("sink", sink, depends_on=("src",)),
|
||||
],
|
||||
namespace="ns",
|
||||
)
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub)
|
||||
|
||||
report = px.run(main)
|
||||
assert report.success
|
||||
assert report["ns:sink"] == 42
|
||||
|
||||
|
||||
def test_add_subgraph_chained() -> None:
|
||||
"""多个子图可链式合并到主图."""
|
||||
sub_a = px.Graph.from_specs([px.TaskSpec("a", _fn)], namespace="nsA")
|
||||
sub_b = px.Graph.from_specs([px.TaskSpec("b", _fn)], namespace="nsB")
|
||||
|
||||
main = px.Graph()
|
||||
main.add_subgraph(sub_a).add_subgraph(sub_b)
|
||||
|
||||
assert "nsA:a" in main
|
||||
assert "nsB:b" in main
|
||||
@@ -126,3 +126,50 @@ class TestRunReportDescribe:
|
||||
report.results["a"] = TaskResult[Any](spec=spec, status=TaskStatus.PENDING)
|
||||
desc = report.describe()
|
||||
assert "-" in desc # duration 显示为 "-"
|
||||
|
||||
|
||||
class TestRunReportQueries:
|
||||
"""测试 RunReport 的新查询 API."""
|
||||
|
||||
def test_succeeded_tasks(self) -> None:
|
||||
"""succeeded_tasks 返回 SUCCESS 状态的任务名."""
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS)
|
||||
report.results["b"] = _make_result("b", status=TaskStatus.FAILED)
|
||||
report.results["c"] = _make_result("c", status=TaskStatus.SUCCESS)
|
||||
assert report.succeeded_tasks() == ["a", "c"]
|
||||
|
||||
def test_skipped_tasks(self) -> None:
|
||||
"""skipped_tasks 返回 SKIPPED 状态的任务名."""
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _make_result("a", status=TaskStatus.SKIPPED)
|
||||
report.results["b"] = _make_result("b", status=TaskStatus.SUCCESS)
|
||||
assert report.skipped_tasks() == ["a"]
|
||||
|
||||
def test_tasks_by_status(self) -> None:
|
||||
"""tasks_by_status 按指定状态过滤."""
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _make_result("a", status=TaskStatus.FAILED)
|
||||
report.results["b"] = _make_result("b", status=TaskStatus.FAILED)
|
||||
report.results["c"] = _make_result("c", status=TaskStatus.SUCCESS)
|
||||
assert report.tasks_by_status(TaskStatus.FAILED) == ["a", "b"]
|
||||
assert report.tasks_by_status(TaskStatus.SUCCESS) == ["c"]
|
||||
assert report.tasks_by_status(TaskStatus.SKIPPED) == []
|
||||
|
||||
def test_durations(self) -> None:
|
||||
"""durations 返回任务名 -> 时长映射."""
|
||||
report = px.RunReport()
|
||||
report.results["a"] = _make_result("a", duration=1.5)
|
||||
report.results["b"] = _make_result("b", duration=2.0)
|
||||
durs = report.durations()
|
||||
assert durs["a"] == 1.5
|
||||
assert durs["b"] == 2.0
|
||||
|
||||
def test_durations_no_duration(self) -> None:
|
||||
"""无时长的任务应返回 0.0."""
|
||||
report = px.RunReport()
|
||||
spec: TaskSpec[Any] = TaskSpec[Any]("a", _fn) # type: ignore[arg-type]
|
||||
report.results["a"] = TaskResult[Any](spec=spec, status=TaskStatus.PENDING)
|
||||
durs = report.durations()
|
||||
assert durs["a"] == 0.0
|
||||
|
||||
|
||||
+177
-65
@@ -53,18 +53,18 @@ class TestCliRunnerConstruction:
|
||||
|
||||
def test_requires_at_least_one_command(self) -> None:
|
||||
"""没有命令时应抛出 ValueError."""
|
||||
with pytest.raises(ValueError, match="至少需要一个命令"):
|
||||
with pytest.raises(ValueError, match="至少需要一个别名"):
|
||||
_ = px.CliRunner()
|
||||
|
||||
def test_accepts_single_graph(self) -> None:
|
||||
"""单个命令应正常构造."""
|
||||
runner = px.CliRunner(graphs={"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
assert runner.commands == ["clean"]
|
||||
|
||||
def test_accepts_multiple_graphs(self) -> None:
|
||||
"""多个命令应按插入顺序保留."""
|
||||
runner = px.CliRunner(
|
||||
graphs={
|
||||
aliases={
|
||||
"clean": _echo_graph("c", "clean"),
|
||||
"build": _echo_graph("b", "build"),
|
||||
"test": _echo_graph("t", "test"),
|
||||
@@ -72,39 +72,39 @@ class TestCliRunnerConstruction:
|
||||
)
|
||||
assert runner.commands == ["clean", "build", "test"]
|
||||
|
||||
def test_default_strategy_is_sequential(self) -> None:
|
||||
"""默认策略应为 Strategy.SEQUENTIAL."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
assert runner.strategy == "sequential"
|
||||
def test_default_strategy_is_dependency(self) -> None:
|
||||
"""默认策略应为 dependency(依赖驱动,最大并行度)."""
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
assert runner.strategy == "dependency"
|
||||
|
||||
def test_custom_strategy_string(self) -> None:
|
||||
"""应支持通过字符串指定策略."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()}, strategy="thread")
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="thread")
|
||||
assert runner.strategy == "thread"
|
||||
|
||||
def test_custom_strategy_enum(self) -> None:
|
||||
"""应支持通过 Strategy 枚举指定策略."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()}, strategy="async")
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="async")
|
||||
assert runner.strategy == "async"
|
||||
|
||||
def test_default_verbose_is_true(self) -> None:
|
||||
"""默认 verbose 应为 True."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
assert runner.verbose is True
|
||||
|
||||
def test_custom_verbose_false(self) -> None:
|
||||
"""应支持关闭 verbose."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()}, verbose=False)
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()}, verbose=False)
|
||||
assert runner.verbose is False
|
||||
|
||||
def test_default_description_is_empty(self) -> None:
|
||||
"""默认描述应为空字符串."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
assert runner.description == ""
|
||||
|
||||
def test_custom_description(self) -> None:
|
||||
"""应支持自定义描述."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()}, description="My CLI")
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()}, description="My CLI")
|
||||
assert runner.description == "My CLI"
|
||||
|
||||
|
||||
@@ -116,13 +116,13 @@ class TestCliRunnerProperties:
|
||||
|
||||
def test_commands_returns_list(self) -> None:
|
||||
"""commands 应返回列表."""
|
||||
runner = px.CliRunner({"a": _echo_graph(), "b": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"a": _echo_graph(), "b": _echo_graph()})
|
||||
assert isinstance(runner.commands, list)
|
||||
|
||||
def test_graphs_contains_original_graphs(self) -> None:
|
||||
"""graphs 应包含原始 Graph 实例."""
|
||||
g = _echo_graph()
|
||||
runner = px.CliRunner({"cmd": g})
|
||||
runner = px.CliRunner(aliases={"cmd": g})
|
||||
assert runner.graphs["cmd"] is g
|
||||
|
||||
|
||||
@@ -136,69 +136,69 @@ class TestCliRunnerParser:
|
||||
"""create_parser 应返回 ArgumentParser."""
|
||||
from argparse import ArgumentParser
|
||||
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
assert isinstance(parser, ArgumentParser)
|
||||
|
||||
def test_parser_has_command_argument(self) -> None:
|
||||
"""解析器应有 command 位置参数."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean"])
|
||||
assert parsed.command == "clean"
|
||||
|
||||
def test_parser_command_is_optional(self) -> None:
|
||||
"""command 应为可选参数."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args([])
|
||||
assert parsed.command is None
|
||||
|
||||
def test_parser_has_strategy_option(self) -> None:
|
||||
"""解析器应有 --strategy 选项."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean", "--strategy", "thread"])
|
||||
assert parsed.strategy == "thread"
|
||||
|
||||
def test_parser_strategy_default(self) -> None:
|
||||
"""--strategy 默认值应与构造时一致."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()}, strategy="async")
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()}, strategy="async")
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean"])
|
||||
assert parsed.strategy == "async"
|
||||
|
||||
def test_parser_has_dry_run_flag(self) -> None:
|
||||
"""解析器应有 --dry-run 标志."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean", "--dry-run"])
|
||||
assert parsed.dry_run is True
|
||||
|
||||
def test_parser_dry_run_default_false(self) -> None:
|
||||
"""--dry-run 默认为 False."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean"])
|
||||
assert parsed.dry_run is False
|
||||
|
||||
def test_parser_has_list_flag(self) -> None:
|
||||
"""解析器应有 --list 标志."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["--list"])
|
||||
assert parsed.list is True
|
||||
|
||||
def test_parser_has_quiet_flag(self) -> None:
|
||||
"""解析器应有 --quiet 标志."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean", "--quiet"])
|
||||
assert parsed.quiet is True
|
||||
|
||||
def test_parser_quiet_default_false(self) -> None:
|
||||
"""--quiet 默认为 False."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
parser = runner.create_parser()
|
||||
parsed = parser.parse_args(["clean"])
|
||||
assert parsed.quiet is False
|
||||
@@ -222,7 +222,7 @@ class TestCliRunnerRunSuccess:
|
||||
|
||||
def test_run_valid_command_returns_zero(self) -> None:
|
||||
"""有效命令执行成功应返回 0."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
exit_code = runner.run(["clean"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
@@ -236,28 +236,30 @@ class TestCliRunnerRunSuccess:
|
||||
def track_b() -> None:
|
||||
executed.append("b")
|
||||
|
||||
runner = px.CliRunner({
|
||||
"a": px.Graph.from_specs([px.TaskSpec("a", track_a)]),
|
||||
"b": px.Graph.from_specs([px.TaskSpec("b", track_b)]),
|
||||
})
|
||||
runner = px.CliRunner(
|
||||
aliases={
|
||||
"a": px.Graph.from_specs([px.TaskSpec("a", track_a)]),
|
||||
"b": px.Graph.from_specs([px.TaskSpec("b", track_b)]),
|
||||
}
|
||||
)
|
||||
_ = runner.run(["b"])
|
||||
assert executed == ["b"]
|
||||
|
||||
def test_run_multi_task_graph(self) -> None:
|
||||
"""应能执行带依赖的多任务图."""
|
||||
runner = px.CliRunner({"multi": _multi_task_graph()})
|
||||
runner = px.CliRunner(aliases={"multi": _multi_task_graph()})
|
||||
exit_code = runner.run(["multi"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_run_with_strategy_override(self) -> None:
|
||||
"""应支持通过 --strategy 覆盖默认策略."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
exit_code = runner.run(["echo", "--strategy", "thread"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_run_with_dry_run(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""--dry-run 应只打印计划不执行."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
exit_code = runner.run(["echo", "--dry-run"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
captured = capsys.readouterr()
|
||||
@@ -272,7 +274,7 @@ class TestCliRunnerVerbose:
|
||||
|
||||
def test_verbose_default_prints_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""默认 verbose=True 应打印任务生命周期."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
_ = runner.run(["echo"])
|
||||
captured = capsys.readouterr()
|
||||
# verbose 模式下应打印任务生命周期
|
||||
@@ -280,7 +282,7 @@ class TestCliRunnerVerbose:
|
||||
|
||||
def test_quiet_flag_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""--quiet 应关闭 verbose 输出."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
_ = runner.run(["echo", "--quiet"])
|
||||
captured = capsys.readouterr()
|
||||
# quiet 模式下不应有 [verbose] 前缀的输出
|
||||
@@ -288,14 +290,14 @@ class TestCliRunnerVerbose:
|
||||
|
||||
def test_verbose_false_constructor_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""构造时 verbose=False 应关闭 verbose 输出."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()}, verbose=False)
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()}, verbose=False)
|
||||
_ = runner.run(["echo"])
|
||||
captured = capsys.readouterr()
|
||||
assert "[verbose]" not in captured.out
|
||||
|
||||
def test_verbose_prints_command_for_cmd_task(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""verbose 模式下 cmd 任务应打印执行的命令."""
|
||||
runner = px.CliRunner({"echo": _echo_graph(msg="verbose-test")})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph(msg="verbose-test")})
|
||||
_ = runner.run(["echo"])
|
||||
captured = capsys.readouterr()
|
||||
# 应打印执行的命令
|
||||
@@ -305,7 +307,7 @@ class TestCliRunnerVerbose:
|
||||
|
||||
def test_verbose_prints_success_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""verbose 模式下成功任务应打印成功信息."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
_ = runner.run(["echo"])
|
||||
captured = capsys.readouterr()
|
||||
assert "成功" in captured.out
|
||||
@@ -319,14 +321,14 @@ class TestCliRunnerVerbose:
|
||||
conditions=(lambda _ctx: False,),
|
||||
),
|
||||
])
|
||||
runner = px.CliRunner({"skip": graph})
|
||||
runner = px.CliRunner(aliases={"skip": graph})
|
||||
_ = runner.run(["skip"])
|
||||
captured = capsys.readouterr()
|
||||
assert "跳过" in captured.out
|
||||
|
||||
def test_verbose_prints_failure_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""verbose 模式下失败任务应打印失败信息."""
|
||||
runner = px.CliRunner({"fail": _failing_graph()})
|
||||
runner = px.CliRunner(aliases={"fail": _failing_graph()})
|
||||
_ = runner.run(["fail"])
|
||||
captured = capsys.readouterr()
|
||||
# 失败信息可能出现在 stdout (verbose) 或 stderr (PyFlowXError)
|
||||
@@ -342,7 +344,7 @@ class TestCliRunnerRunFailure:
|
||||
|
||||
def test_run_unknown_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""未知命令应返回 1 并打印错误."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
exit_code = runner.run(["unknown"])
|
||||
assert exit_code == CliExitCode.FAILURE.value
|
||||
captured = capsys.readouterr()
|
||||
@@ -351,7 +353,7 @@ class TestCliRunnerRunFailure:
|
||||
|
||||
def test_run_no_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""无命令时应返回 1 并打印帮助."""
|
||||
runner = px.CliRunner({"clean": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph()})
|
||||
exit_code = runner.run([])
|
||||
assert exit_code == CliExitCode.FAILURE.value
|
||||
captured = capsys.readouterr()
|
||||
@@ -359,13 +361,13 @@ class TestCliRunnerRunFailure:
|
||||
|
||||
def test_run_failing_task_returns_failure(self) -> None:
|
||||
"""任务失败时应返回 1."""
|
||||
runner = px.CliRunner({"fail": _failing_graph()})
|
||||
runner = px.CliRunner(aliases={"fail": _failing_graph()})
|
||||
exit_code = runner.run(["fail"])
|
||||
assert exit_code == CliExitCode.FAILURE.value
|
||||
|
||||
def test_run_failing_task_prints_error(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""任务失败时应打印错误信息."""
|
||||
runner = px.CliRunner({"fail": _failing_graph()})
|
||||
runner = px.CliRunner(aliases={"fail": _failing_graph()})
|
||||
_ = runner.run(["fail"])
|
||||
captured = capsys.readouterr()
|
||||
# PyFlowXError 信息应输出到 stderr
|
||||
@@ -380,17 +382,19 @@ class TestCliRunnerList:
|
||||
|
||||
def test_list_returns_success(self) -> None:
|
||||
"""--list 应返回 0."""
|
||||
runner = px.CliRunner({"clean": _echo_graph(), "build": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"clean": _echo_graph(), "build": _echo_graph()})
|
||||
exit_code = runner.run(["--list"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_list_prints_all_commands(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""--list 应打印所有命令."""
|
||||
runner = px.CliRunner({
|
||||
"clean": _echo_graph("c", "clean"),
|
||||
"build": _echo_graph("b", "build"),
|
||||
"test": _echo_graph("t", "test"),
|
||||
})
|
||||
runner = px.CliRunner(
|
||||
aliases={
|
||||
"clean": _echo_graph("c", "clean"),
|
||||
"build": _echo_graph("b", "build"),
|
||||
"test": _echo_graph("t", "test"),
|
||||
}
|
||||
)
|
||||
_ = runner.run(["--list"])
|
||||
captured = capsys.readouterr()
|
||||
assert "clean" in captured.out
|
||||
@@ -404,7 +408,7 @@ class TestCliRunnerList:
|
||||
def track() -> None:
|
||||
executed.append("ran")
|
||||
|
||||
runner = px.CliRunner({"a": px.Graph.from_specs([px.TaskSpec("a", track)])})
|
||||
runner = px.CliRunner(aliases={"a": px.Graph.from_specs([px.TaskSpec("a", track)])})
|
||||
_ = runner.run(["--list"])
|
||||
assert executed == []
|
||||
|
||||
@@ -417,7 +421,7 @@ class TestCliRunnerErrorHandling:
|
||||
|
||||
def test_keyboard_interrupt_returns_130(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""KeyboardInterrupt 应返回 130."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
|
||||
def raise_interrupt(*_args: Any, **_kwargs: Any) -> None:
|
||||
raise KeyboardInterrupt
|
||||
@@ -430,7 +434,7 @@ class TestCliRunnerErrorHandling:
|
||||
|
||||
def test_pyflowx_error_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""PyFlowXError 应返回 1."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
|
||||
def raise_error(*_args: Any, **_kwargs: Any) -> None:
|
||||
raise TaskFailedError("echo", RuntimeError("boom"), 1)
|
||||
@@ -447,7 +451,7 @@ class TestCliRunnerErrorHandling:
|
||||
class CustomError(Exception):
|
||||
pass
|
||||
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
|
||||
def raise_custom(*_args: Any, **_kwargs: Any) -> None:
|
||||
raise CustomError("unexpected")
|
||||
@@ -464,14 +468,14 @@ class TestCliRunnerRunCli:
|
||||
|
||||
def test_run_cli_calls_sys_exit(self) -> None:
|
||||
"""run_cli 应调用 sys.exit."""
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
runner.run_cli(["echo"])
|
||||
assert exc_info.value.code == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_run_cli_exit_code_on_failure(self) -> None:
|
||||
"""run_cli 失败时应以非零码退出."""
|
||||
runner = px.CliRunner({"fail": _failing_graph()})
|
||||
runner = px.CliRunner(aliases={"fail": _failing_graph()})
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
runner.run_cli(["fail"])
|
||||
assert exc_info.value.code == CliExitCode.FAILURE.value
|
||||
@@ -479,7 +483,7 @@ class TestCliRunnerRunCli:
|
||||
def test_run_cli_no_args_uses_sys_argv(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""run_cli 无参数时应使用 sys.argv."""
|
||||
monkeypatch.setattr(sys, "argv", ["pymake", "echo"])
|
||||
runner = px.CliRunner({"echo": _echo_graph()})
|
||||
runner = px.CliRunner(aliases={"echo": _echo_graph()})
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
runner.run_cli()
|
||||
assert exc_info.value.code == CliExitCode.SUCCESS.value
|
||||
@@ -520,7 +524,7 @@ class TestCliRunnerIntegration:
|
||||
conditions=(lambda _ctx: False,),
|
||||
),
|
||||
])
|
||||
runner = px.CliRunner({"skip": graph})
|
||||
runner = px.CliRunner(aliases={"skip": graph})
|
||||
exit_code = runner.run(["skip"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
@@ -533,7 +537,7 @@ class TestCliRunnerIntegration:
|
||||
conditions=(lambda _ctx: True,),
|
||||
),
|
||||
])
|
||||
runner = px.CliRunner({"run": graph})
|
||||
runner = px.CliRunner(aliases={"run": graph})
|
||||
exit_code = runner.run(["run"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
@@ -554,17 +558,19 @@ class TestCliRunnerIntegration:
|
||||
px.TaskSpec("c", make("c"), depends_on=("a",)),
|
||||
px.TaskSpec("d", make("d"), depends_on=("b", "c")),
|
||||
])
|
||||
runner = px.CliRunner({"diamond": graph})
|
||||
runner = px.CliRunner(aliases={"diamond": graph})
|
||||
exit_code = runner.run(["diamond"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
assert order == ["a", "b", "c", "d"]
|
||||
|
||||
def test_mixed_fn_and_cmd_commands(self) -> None:
|
||||
"""混合 fn 和 cmd 的命令应都能执行."""
|
||||
runner = px.CliRunner({
|
||||
"fn_cmd": px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]),
|
||||
"cmd_cmd": px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]),
|
||||
})
|
||||
runner = px.CliRunner(
|
||||
aliases={
|
||||
"fn_cmd": px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]),
|
||||
"cmd_cmd": px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]),
|
||||
}
|
||||
)
|
||||
assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value
|
||||
assert runner.run(["cmd_cmd"]) == CliExitCode.SUCCESS.value
|
||||
|
||||
@@ -580,7 +586,7 @@ class TestCliRunnerIntegration:
|
||||
ls_cmd = ["ls"]
|
||||
|
||||
graph = px.Graph.from_specs([px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))])
|
||||
runner = px.CliRunner({"ls": graph})
|
||||
runner = px.CliRunner(aliases={"ls": graph})
|
||||
exit_code = runner.run(["ls"])
|
||||
assert exit_code == CliExitCode.SUCCESS.value
|
||||
|
||||
@@ -612,3 +618,109 @@ class TestApplyVerboseToGraph:
|
||||
new_graph = _apply_verbose_to_graph(graph, verbose=True)
|
||||
new_spec = new_graph.spec("a")
|
||||
assert new_spec.verbose is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
# 新 API: tasks + aliases
|
||||
# ---------------------------------------------------------------------- #
|
||||
class TestCliRunnerNewApi:
|
||||
"""测试 CliRunner 的 tasks + aliases 新 API."""
|
||||
|
||||
def test_tasks_plus_aliases_single_str(self) -> None:
|
||||
"""tasks 注册 + aliases str 引用单任务."""
|
||||
runner = px.CliRunner(
|
||||
tasks=[px.cmd([*ECHO_CMD, "a"], name="task_a")],
|
||||
aliases={"a": "task_a"},
|
||||
)
|
||||
assert runner.commands == ["a"]
|
||||
assert runner.run(["a"]) == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_aliases_list_str_builds_chain(self) -> None:
|
||||
"""aliases list[str] 应建立 chain 依赖(后一个依赖前一个)."""
|
||||
runner = px.CliRunner(
|
||||
tasks=[
|
||||
px.cmd([*ECHO_CMD, "a"], name="task_a"),
|
||||
px.cmd([*ECHO_CMD, "b"], name="task_b"),
|
||||
],
|
||||
aliases={"ab": ["task_a", "task_b"]},
|
||||
)
|
||||
graph = runner.graphs["ab"]
|
||||
specs = graph.all_specs()
|
||||
assert specs["task_b"].depends_on == ("task_a",)
|
||||
|
||||
def test_aliases_taskspec_value(self) -> None:
|
||||
"""aliases 值为 TaskSpec 时直接生成单任务图."""
|
||||
spec = px.cmd([*ECHO_CMD, "x"], name="inline_x")
|
||||
runner = px.CliRunner(aliases={"x": spec})
|
||||
assert runner.run(["x"]) == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_aliases_graph_value(self) -> None:
|
||||
"""aliases 值为 Graph 时原样使用(复杂场景:conditions 等)."""
|
||||
graph = px.Graph.from_specs([
|
||||
px.TaskSpec("a", cmd=[*ECHO_CMD, "a"]),
|
||||
px.TaskSpec("b", cmd=[*ECHO_CMD, "b"], depends_on=("a",)),
|
||||
])
|
||||
runner = px.CliRunner(aliases={"g": graph})
|
||||
assert set(runner.graphs["g"].all_specs().keys()) == {"a", "b"}
|
||||
|
||||
def test_alias_name_same_as_task_name_via_taskspec(self) -> None:
|
||||
"""alias 名与 task 名相同时,用 TaskSpec 避免自引用循环."""
|
||||
spec = px.cmd([*ECHO_CMD, "same"], name="same")
|
||||
runner = px.CliRunner(aliases={"same": spec})
|
||||
assert runner.run(["same"]) == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_alias_str_reference_to_other_alias(self) -> None:
|
||||
"""alias 值为 str 引用其他 alias."""
|
||||
runner = px.CliRunner(
|
||||
aliases={
|
||||
"base": px.cmd([*ECHO_CMD, "base"], name="base"),
|
||||
"wrapper": "base",
|
||||
},
|
||||
)
|
||||
assert runner.run(["wrapper"]) == CliExitCode.SUCCESS.value
|
||||
|
||||
def test_empty_aliases_raises(self) -> None:
|
||||
"""空 aliases 应抛 ValueError."""
|
||||
with pytest.raises(ValueError, match="至少需要一个别名"):
|
||||
_ = px.CliRunner()
|
||||
|
||||
def test_empty_list_value_raises(self) -> None:
|
||||
"""空 list 作为 alias 值应抛 ValueError."""
|
||||
with pytest.raises(ValueError, match="任务列表为空"):
|
||||
_ = px.CliRunner(aliases={"x": []})
|
||||
|
||||
def test_invalid_value_type_raises(self) -> None:
|
||||
"""无效类型(int)作为 alias 值应抛 TypeError."""
|
||||
with pytest.raises(TypeError, match="值类型无效"):
|
||||
_ = px.CliRunner(aliases={"x": 123}) # type: ignore[dict-item]
|
||||
|
||||
def test_invalid_list_element_type_raises(self) -> None:
|
||||
"""list 中非 str/TaskSpec 元素应抛 TypeError."""
|
||||
with pytest.raises(TypeError, match="列表元素类型无效"):
|
||||
_ = px.CliRunner(aliases={"x": [123]}) # type: ignore[list-item]
|
||||
|
||||
def test_duplicate_task_name_raises(self) -> None:
|
||||
"""tasks 中重名任务应抛 ValueError."""
|
||||
spec = px.cmd([*ECHO_CMD, "a"], name="dup")
|
||||
with pytest.raises(ValueError, match="任务名重复"):
|
||||
_ = px.CliRunner(tasks=[spec, spec], aliases={"a": "dup"})
|
||||
|
||||
def test_commands_excludes_unreferenced_tasks(self) -> None:
|
||||
"""commands 只含 aliases,不含 tasks 中未引用的任务."""
|
||||
runner = px.CliRunner(
|
||||
tasks=[
|
||||
px.cmd([*ECHO_CMD, "a"], name="used"),
|
||||
px.cmd([*ECHO_CMD, "b"], name="unused"),
|
||||
],
|
||||
aliases={"a": "used"},
|
||||
)
|
||||
assert runner.commands == ["a"]
|
||||
|
||||
def test_unknown_command_rejected(self) -> None:
|
||||
"""未注册的 alias 名应被拒绝(不接受裸 task 名)."""
|
||||
runner = px.CliRunner(
|
||||
tasks=[px.cmd([*ECHO_CMD, "a"], name="task_a")],
|
||||
aliases={"a": "task_a"},
|
||||
)
|
||||
# task_a 是任务名,不是 alias,应被拒绝
|
||||
assert runner.run(["task_a"]) == CliExitCode.FAILURE.value
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Tests for streaming result passing (iterators between tasks)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterator
|
||||
|
||||
import pyflowx as px
|
||||
|
||||
|
||||
def test_generator_passed_as_iterator() -> None:
|
||||
"""上游返回生成器,下游应能惰性消费."""
|
||||
|
||||
@px.task
|
||||
def source() -> Iterator[int]:
|
||||
yield from range(5)
|
||||
|
||||
@px.task(depends_on=("source",))
|
||||
def consume(source: Iterator[int]) -> int:
|
||||
return sum(source)
|
||||
|
||||
graph = px.Graph.from_specs([source, consume])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["consume"] == 10
|
||||
|
||||
|
||||
def test_large_range_streaming() -> None:
|
||||
"""大范围迭代器流式传递,避免中间列表."""
|
||||
|
||||
@px.task
|
||||
def numbers() -> Iterator[int]:
|
||||
yield from range(1000)
|
||||
|
||||
@px.task(depends_on=("numbers",))
|
||||
def total(numbers: Iterator[int]) -> int:
|
||||
return sum(numbers)
|
||||
|
||||
graph = px.Graph.from_specs([numbers, total])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["total"] == sum(range(1000))
|
||||
|
||||
|
||||
def test_chain_multiple_streams() -> None:
|
||||
"""多个流式任务串联."""
|
||||
|
||||
@px.task
|
||||
def gen() -> Iterator[int]:
|
||||
yield from range(10)
|
||||
|
||||
@px.task(depends_on=("gen",))
|
||||
def doubled(gen: Iterator[int]) -> Iterator[int]:
|
||||
for x in gen:
|
||||
yield x * 2
|
||||
|
||||
@px.task(depends_on=("doubled",))
|
||||
def collect(doubled: Iterator[int]) -> list[int]:
|
||||
return list(doubled)
|
||||
|
||||
graph = px.Graph.from_specs([gen, doubled, collect])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["collect"] == [x * 2 for x in range(10)]
|
||||
@@ -14,6 +14,7 @@ from pyflowx.task import (
|
||||
TaskSpec,
|
||||
TaskStatus,
|
||||
_env_and_cwd,
|
||||
cmd,
|
||||
task_template,
|
||||
)
|
||||
|
||||
@@ -78,6 +79,41 @@ def test_retry_policy_negative_jitter_rejected() -> None:
|
||||
RetryPolicy(jitter=-1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------- #
|
||||
# cmd() 工厂
|
||||
# ---------------------------------------------------------------------- #
|
||||
def test_cmd_factory_default_name_from_two_elements() -> None:
|
||||
"""cmd() 默认 name = '_'.join(command[:2])."""
|
||||
spec = cmd(["uv", "build"])
|
||||
assert spec.name == "uv_build"
|
||||
assert spec.cmd == ["uv", "build"]
|
||||
|
||||
|
||||
def test_cmd_factory_default_name_single_element() -> None:
|
||||
"""cmd() 单元素命令 name = command[0]."""
|
||||
spec = cmd(["ls"])
|
||||
assert spec.name == "ls"
|
||||
|
||||
|
||||
def test_cmd_factory_explicit_name() -> None:
|
||||
"""cmd() 显式 name 覆盖默认推导."""
|
||||
spec = cmd(["ruff", "check", "--fix"], name="lint")
|
||||
assert spec.name == "lint"
|
||||
|
||||
|
||||
def test_cmd_factory_passes_depends_on() -> None:
|
||||
"""cmd() depends_on 透传给 TaskSpec."""
|
||||
spec = cmd(["echo", "b"], name="b", depends_on=("a",))
|
||||
assert spec.depends_on == ("a",)
|
||||
|
||||
|
||||
def test_cmd_factory_passes_extra_kwargs() -> None:
|
||||
"""cmd() 其余 kwargs 透传给 TaskSpec."""
|
||||
spec = cmd(["echo", "x"], name="x", timeout=10.0, tags=("t1",))
|
||||
assert spec.timeout == 10.0
|
||||
assert spec.tags == ("t1",)
|
||||
|
||||
|
||||
def test_retry_policy_retries_property() -> None:
|
||||
policy = RetryPolicy(max_attempts=3)
|
||||
assert policy.retries == 2
|
||||
|
||||
@@ -0,0 +1,136 @@
|
||||
"""Tests for the @task decorator API."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, Mapping
|
||||
|
||||
import pyflowx as px
|
||||
from pyflowx.task import RetryPolicy, TaskHooks, TaskSpec
|
||||
|
||||
|
||||
def test_task_decorator_plain() -> None:
|
||||
"""@task 无参数装饰:name 取函数名,返回 TaskSpec."""
|
||||
|
||||
@px.task
|
||||
def extract() -> list[int]:
|
||||
return [1, 2, 3]
|
||||
|
||||
assert isinstance(extract, TaskSpec)
|
||||
assert extract.name == "extract"
|
||||
assert extract.fn is not None
|
||||
assert extract.depends_on == ()
|
||||
|
||||
|
||||
def test_task_decorator_with_params() -> None:
|
||||
"""@task(...) 带参数装饰:传递依赖与重试."""
|
||||
|
||||
@px.task(depends_on=("extract",), retry=RetryPolicy(max_attempts=3))
|
||||
def double(extract: list[int]) -> list[int]:
|
||||
return [x * 2 for x in extract]
|
||||
|
||||
assert isinstance(double, TaskSpec)
|
||||
assert double.name == "double"
|
||||
assert double.depends_on == ("extract",)
|
||||
assert double.retry.max_attempts == 3
|
||||
|
||||
|
||||
def test_task_decorator_explicit_name() -> None:
|
||||
"""@task(name=...) 应使用显式名称而非函数名."""
|
||||
|
||||
@px.task(name="custom_name")
|
||||
def my_func() -> None:
|
||||
return None
|
||||
|
||||
assert my_func.name == "custom_name"
|
||||
|
||||
|
||||
def test_task_decorator_cmd_form() -> None:
|
||||
"""@task(cmd=...) 应支持命令形式."""
|
||||
|
||||
spec = px.task(cmd=["ls", "-la"], name="list_files")
|
||||
assert isinstance(spec, TaskSpec)
|
||||
assert spec.name == "list_files"
|
||||
assert spec.cmd == ["ls", "-la"]
|
||||
|
||||
|
||||
def test_task_decorator_full_options() -> None:
|
||||
"""@task 应支持全部 TaskSpec 字段."""
|
||||
|
||||
@px.task(
|
||||
depends_on=("a",),
|
||||
soft_depends_on=("b",),
|
||||
defaults={"b": 0},
|
||||
args=(1,),
|
||||
kwargs={"x": 2},
|
||||
retry=RetryPolicy(max_attempts=5),
|
||||
timeout=10.0,
|
||||
tags=("t1",),
|
||||
conditions=(px.BuiltinConditions.IS_WINDOWS,), # type: ignore[arg-type]
|
||||
cwd="/tmp",
|
||||
env={"K": "v"},
|
||||
verbose=True,
|
||||
skip_if_missing=True,
|
||||
allow_upstream_skip=True,
|
||||
strategy="thread",
|
||||
priority=3,
|
||||
concurrency_key="db",
|
||||
continue_on_error=True,
|
||||
)
|
||||
def f(a: int) -> int:
|
||||
return a
|
||||
|
||||
assert f.depends_on == ("a",)
|
||||
assert f.soft_depends_on == ("b",)
|
||||
assert f.defaults == {"b": 0}
|
||||
assert f.args == (1,)
|
||||
assert f.kwargs == {"x": 2}
|
||||
assert f.retry.max_attempts == 5
|
||||
assert f.timeout == 10.0
|
||||
assert f.tags == ("t1",)
|
||||
assert len(f.conditions) == 1
|
||||
assert isinstance(f.cwd, Path)
|
||||
assert f.cwd == Path("/tmp")
|
||||
assert f.env == {"K": "v"}
|
||||
assert f.verbose is True
|
||||
assert f.skip_if_missing is True
|
||||
assert f.allow_upstream_skip is True
|
||||
assert f.strategy == "thread"
|
||||
assert f.priority == 3
|
||||
assert f.concurrency_key == "db"
|
||||
assert f.continue_on_error is True
|
||||
|
||||
|
||||
def test_task_decorator_runs_in_graph() -> None:
|
||||
"""装饰器生成的 TaskSpec 应能直接构建图并运行."""
|
||||
|
||||
@px.task
|
||||
def extract() -> list[int]:
|
||||
return [1, 2, 3]
|
||||
|
||||
@px.task(depends_on=("extract",))
|
||||
def double(extract: list[int]) -> list[int]:
|
||||
return [x * 2 for x in extract]
|
||||
|
||||
graph = px.Graph.from_specs([extract, double])
|
||||
report = px.run(graph)
|
||||
assert report.success
|
||||
assert report["double"] == [2, 4, 6]
|
||||
|
||||
|
||||
def test_task_decorator_hooks_passthrough() -> None:
|
||||
"""@task(hooks=...) 应传递 TaskHooks 实例."""
|
||||
|
||||
hooks = TaskHooks(pre_run=lambda _spec: None)
|
||||
spec = px.task(fn=lambda: None, hooks=hooks, name="h")
|
||||
assert spec.hooks is hooks
|
||||
|
||||
|
||||
def test_task_decorator_cache_key_passthrough() -> None:
|
||||
"""@task(cache_key=...) 应传递缓存键函数."""
|
||||
|
||||
def ck(ctx: Mapping[str, Any]) -> str:
|
||||
return "k"
|
||||
|
||||
spec = px.task(fn=lambda: None, cache_key=ck, name="c")
|
||||
assert spec.cache_key is ck
|
||||
Reference in New Issue
Block a user