40f641611b
1. 将CliRunner默认执行策略从sequential改为dependency 2. 新增RunReport的任务状态查询和时长统计方法 3. 实现task装饰器并补充executor参数文档 4. 新增进程池执行器支持CPU密集型任务 5. 新增Graph.chain链式构建和add_subgraph子图合并功能 6. 新增流式任务传递、进程池执行、命名空间等多类测试用例 7. 补充tests目录路径导入配置
137 lines
3.7 KiB
Python
137 lines
3.7 KiB
Python
"""Tests for the @task decorator API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Mapping
|
|
|
|
import pyflowx as px
|
|
from pyflowx.task import RetryPolicy, TaskHooks, TaskSpec
|
|
|
|
|
|
def test_task_decorator_plain() -> None:
|
|
"""@task 无参数装饰:name 取函数名,返回 TaskSpec."""
|
|
|
|
@px.task
|
|
def extract() -> list[int]:
|
|
return [1, 2, 3]
|
|
|
|
assert isinstance(extract, TaskSpec)
|
|
assert extract.name == "extract"
|
|
assert extract.fn is not None
|
|
assert extract.depends_on == ()
|
|
|
|
|
|
def test_task_decorator_with_params() -> None:
|
|
"""@task(...) 带参数装饰:传递依赖与重试."""
|
|
|
|
@px.task(depends_on=("extract",), retry=RetryPolicy(max_attempts=3))
|
|
def double(extract: list[int]) -> list[int]:
|
|
return [x * 2 for x in extract]
|
|
|
|
assert isinstance(double, TaskSpec)
|
|
assert double.name == "double"
|
|
assert double.depends_on == ("extract",)
|
|
assert double.retry.max_attempts == 3
|
|
|
|
|
|
def test_task_decorator_explicit_name() -> None:
|
|
"""@task(name=...) 应使用显式名称而非函数名."""
|
|
|
|
@px.task(name="custom_name")
|
|
def my_func() -> None:
|
|
return None
|
|
|
|
assert my_func.name == "custom_name"
|
|
|
|
|
|
def test_task_decorator_cmd_form() -> None:
|
|
"""@task(cmd=...) 应支持命令形式."""
|
|
|
|
spec = px.task(cmd=["ls", "-la"], name="list_files")
|
|
assert isinstance(spec, TaskSpec)
|
|
assert spec.name == "list_files"
|
|
assert spec.cmd == ["ls", "-la"]
|
|
|
|
|
|
def test_task_decorator_full_options() -> None:
|
|
"""@task 应支持全部 TaskSpec 字段."""
|
|
|
|
@px.task(
|
|
depends_on=("a",),
|
|
soft_depends_on=("b",),
|
|
defaults={"b": 0},
|
|
args=(1,),
|
|
kwargs={"x": 2},
|
|
retry=RetryPolicy(max_attempts=5),
|
|
timeout=10.0,
|
|
tags=("t1",),
|
|
conditions=(px.BuiltinConditions.IS_WINDOWS,), # type: ignore[arg-type]
|
|
cwd="/tmp",
|
|
env={"K": "v"},
|
|
verbose=True,
|
|
skip_if_missing=True,
|
|
allow_upstream_skip=True,
|
|
strategy="thread",
|
|
priority=3,
|
|
concurrency_key="db",
|
|
continue_on_error=True,
|
|
)
|
|
def f(a: int) -> int:
|
|
return a
|
|
|
|
assert f.depends_on == ("a",)
|
|
assert f.soft_depends_on == ("b",)
|
|
assert f.defaults == {"b": 0}
|
|
assert f.args == (1,)
|
|
assert f.kwargs == {"x": 2}
|
|
assert f.retry.max_attempts == 5
|
|
assert f.timeout == 10.0
|
|
assert f.tags == ("t1",)
|
|
assert len(f.conditions) == 1
|
|
assert isinstance(f.cwd, Path)
|
|
assert f.cwd == Path("/tmp")
|
|
assert f.env == {"K": "v"}
|
|
assert f.verbose is True
|
|
assert f.skip_if_missing is True
|
|
assert f.allow_upstream_skip is True
|
|
assert f.strategy == "thread"
|
|
assert f.priority == 3
|
|
assert f.concurrency_key == "db"
|
|
assert f.continue_on_error is True
|
|
|
|
|
|
def test_task_decorator_runs_in_graph() -> None:
|
|
"""装饰器生成的 TaskSpec 应能直接构建图并运行."""
|
|
|
|
@px.task
|
|
def extract() -> list[int]:
|
|
return [1, 2, 3]
|
|
|
|
@px.task(depends_on=("extract",))
|
|
def double(extract: list[int]) -> list[int]:
|
|
return [x * 2 for x in extract]
|
|
|
|
graph = px.Graph.from_specs([extract, double])
|
|
report = px.run(graph)
|
|
assert report.success
|
|
assert report["double"] == [2, 4, 6]
|
|
|
|
|
|
def test_task_decorator_hooks_passthrough() -> None:
|
|
"""@task(hooks=...) 应传递 TaskHooks 实例."""
|
|
|
|
hooks = TaskHooks(pre_run=lambda _spec: None)
|
|
spec = px.task(fn=lambda: None, hooks=hooks, name="h")
|
|
assert spec.hooks is hooks
|
|
|
|
|
|
def test_task_decorator_cache_key_passthrough() -> None:
|
|
"""@task(cache_key=...) 应传递缓存键函数."""
|
|
|
|
def ck(ctx: Mapping[str, Any]) -> str:
|
|
return "k"
|
|
|
|
spec = px.task(fn=lambda: None, cache_key=ck, name="c")
|
|
assert spec.cache_key is ck
|