chore: 批量优化代码与配置,完善类型注解

This commit is contained in:
2026-06-21 10:04:01 +08:00
parent 56c018e72e
commit 60083bcb6e
17 changed files with 351 additions and 357 deletions
+157 -160
View File
@@ -1,4 +1,4 @@
"""Tests for context injection rules."""
"""测试上下文注入规则."""
from __future__ import annotations
@@ -11,225 +11,222 @@ from pyflowx.context import _is_context_annotation, build_call_args, describe_in
from pyflowx.errors import InjectionError
def test_inject_by_parameter_name() -> None:
def fn(a: int, b: str) -> str:
return f"{a}{b}"
class TestBuildCallArgs:
"""测试 build_call_args 函数."""
spec = px.TaskSpec("c", fn, depends_on=("a", "b"))
args, kwargs = build_call_args(spec, {"a": 1, "b": "x"})
assert args == ()
assert kwargs == {"a": 1, "b": "x"}
def test_inject_by_parameter_name(self) -> None:
"""参数名匹配依赖名时应注入对应结果."""
def fn(a: int, b: str) -> str:
return f"{a}{b}"
def test_inject_context_annotation() -> None:
def fn(ctx: px.Context) -> int:
return len(ctx)
spec = px.TaskSpec("c", fn, depends_on=("a", "b"))
_args, kwargs = build_call_args(spec, {"a": 1, "b": "x"})
assert kwargs == {"a": 1, "b": "x"}
spec = px.TaskSpec("agg", fn, depends_on=("a", "b"))
args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99})
# Only the task's own deps are passed.
assert kwargs == {"ctx": {"a": 1, "b": 2}}
def test_inject_context_annotation(self) -> None:
"""标注为 Context 的参数应接收完整依赖映射."""
def fn(ctx: px.Context) -> int:
return len(ctx)
def test_inject_var_keyword() -> None:
def fn(**kwargs: Any) -> int:
return sum(kwargs.values())
spec = px.TaskSpec("agg", fn, depends_on=("a", "b"))
_args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99})
# Only the task's own deps are passed.
assert kwargs == {"ctx": {"a": 1, "b": 2}}
spec = px.TaskSpec("agg", fn, depends_on=("a", "b"))
args, kwargs = build_call_args(spec, {"a": 1, "b": 2})
assert kwargs == {"a": 1, "b": 2}
def test_inject_var_keyword(self) -> None:
"""**kwargs 参数应以 dict 形式接收所有依赖结果."""
def fn(**kwargs: Any) -> int: # pyright: ignore[reportExplicitAny, reportAny]
return sum(kwargs.values())
def test_static_args_and_kwargs() -> None:
def fn(uid: int, source: str) -> str:
return f"{source}:{uid}"
spec = px.TaskSpec("agg", fn, depends_on=("a", "b"))
_args, kwargs = build_call_args(spec, {"a": 1, "b": 2})
assert kwargs == {"a": 1, "b": 2}
spec = px.TaskSpec("fetch", fn, args=(42,), kwargs={"source": "api"})
args, kwargs = build_call_args(spec, {})
assert args == (42,)
assert kwargs == {"source": "api"}
def test_static_args_and_kwargs(self) -> None:
"""静态 args/kwargs 应正确填充非依赖参数."""
def fn(uid: int, source: str) -> str:
return f"{source}:{uid}"
def test_default_param_not_required() -> None:
def fn(a: int, flag: bool = True) -> int:
return a if flag else 0
spec = px.TaskSpec("fetch", fn, args=(42,), kwargs={"source": "api"})
args, kwargs = build_call_args(spec, {})
assert args == (42,)
assert kwargs == {"source": "api"}
spec = px.TaskSpec("t", fn, depends_on=("a",))
args, kwargs = build_call_args(spec, {"a": 5})
assert kwargs == {"a": 5}
def test_default_param_not_required(self) -> None:
"""有默认值的参数无需依赖或静态值."""
def fn(a: int, flag: bool = True) -> int:
return a if flag else 0
def test_unresolved_required_param_raises() -> None:
def fn(a: int, missing: str) -> None:
return None
spec = px.TaskSpec("t", fn, depends_on=("a",))
_args, kwargs = build_call_args(spec, {"a": 5})
assert kwargs == {"a": 5}
spec = px.TaskSpec("t", fn, depends_on=("a",))
with pytest.raises(InjectionError) as exc_info:
build_call_args(spec, {"a": 1})
assert "missing" in str(exc_info.value)
def test_unresolved_required_param_raises(self) -> None:
"""必需参数无法解析时应抛出 InjectionError."""
def fn(_a: int, _: str) -> None:
return None
def test_static_kwargs_collide_with_dependency() -> None:
def fn(a: int) -> int:
return a
spec = px.TaskSpec("t", fn, depends_on=("a",))
with pytest.raises(InjectionError) as exc_info:
_ = build_call_args(spec, {"a": 1})
assert "Cannot inject" in str(exc_info.value)
spec = px.TaskSpec("t", fn, depends_on=("a",), kwargs={"a": 99})
with pytest.raises(InjectionError):
build_call_args(spec, {"a": 1})
def test_static_kwargs_collide_with_dependency(self) -> None:
"""静态 kwargs 与依赖名冲突时应抛出 InjectionError."""
def fn(a: int) -> int:
return a
def test_describe_injection() -> None:
def fn(a: int, ctx: px.Context, flag: bool = False) -> None:
return None
spec = px.TaskSpec("t", fn, depends_on=("a",), kwargs={"a": 99})
with pytest.raises(InjectionError):
_ = build_call_args(spec, {"a": 1})
spec = px.TaskSpec("t", fn, depends_on=("a",))
desc = describe_injection(spec)
assert "a=<result:a>" in desc
assert "ctx=<Context>" in desc
assert "flag=<default>" in desc
def test_var_positional_not_required(self) -> None:
"""*args 参数不应触发 InjectionError."""
def fn(*args: Any) -> int: # pyright: ignore[reportExplicitAny, reportAny]
return len(args)
# ---------------------------------------------------------------------- #
# _is_context_annotation 各分支
# ---------------------------------------------------------------------- #
def test_is_context_annotation_direct_object() -> None:
"""直接传入 Context 别名对象应返回 True。"""
assert _is_context_annotation(px.Context) is True
spec = px.TaskSpec("t", fn, args=(1, 2, 3))
args, kwargs = build_call_args(spec, {})
assert args == (1, 2, 3)
assert kwargs == {}
def test_var_keyword_consumes_leftover(self) -> None:
"""**kwargs 应吞掉未被具名参数消费的依赖结果."""
def test_is_context_annotation_string() -> None:
"""字符串形式的注解应被识别。"""
assert _is_context_annotation("Context") is True
assert _is_context_annotation("px.Context") is True
assert _is_context_annotation("pyflowx.Context") is True
assert _is_context_annotation("NotContext") is False
assert _is_context_annotation("int") is False
def fn(a: int, **rest: Any) -> int: # pyright: ignore[reportExplicitAny, reportAny]
return a + sum(rest.values())
spec = px.TaskSpec("t", fn, depends_on=("a", "b", "c"))
_args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 3})
assert kwargs == {"a": 1, "b": 2, "c": 3}
def test_is_context_annotation_typing_alias() -> None:
"""具有 __name__/_name 为 Context/Mapping 的 typing 别名应返回 True。"""
def test_no_var_keyword_drops_leftover(self) -> None:
"""无 **kwargs 时,未被消费的依赖结果被丢弃(不报错)."""
class FakeAlias:
__name__ = "Context"
def fn(a: int) -> int:
return a
assert _is_context_annotation(FakeAlias()) is True
spec = px.TaskSpec("t", fn, depends_on=("a", "b"))
# b 是依赖但 fn 不接收它 —— 应正常工作
_args, kwargs = build_call_args(spec, {"a": 1, "b": 2})
assert kwargs == {"a": 1}
class FakeMapping:
__name__ = "Mapping"
def test_context_annotation_only_deps(self) -> None:
"""Context 标注只接收该任务自身 depends_on 的结果."""
assert _is_context_annotation(FakeMapping()) is True
def fn(ctx: px.Context) -> int:
return len(ctx)
spec = px.TaskSpec("t", fn, depends_on=("a", "b"))
_args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99})
assert kwargs == {"ctx": {"a": 1, "b": 2}}
def test_is_context_annotation_other() -> None:
"""其他类型注解应返回 False。"""
assert _is_context_annotation(int) is False
assert _is_context_annotation(str) is False
assert _is_context_annotation(None) is False
class TestDescribeInjection:
"""测试 describe_injection 函数."""
# ---------------------------------------------------------------------- #
# describe_injection 其余分支
# ---------------------------------------------------------------------- #
def test_describe_injection_var_positional() -> None:
"""*args 参数应显示为 *args。"""
def test_describe_injection(self) -> None:
"""应正确描述依赖注入、Context 标注和默认值."""
def fn(*args: Any) -> None:
return None
def fn(a: int, ctx: px.Context, flag: bool = False) -> None: # noqa: ARG001
return None
spec = px.TaskSpec("t", fn)
desc = describe_injection(spec)
assert "*args" in desc
spec = px.TaskSpec("t", fn, depends_on=("a",))
desc = describe_injection(spec)
assert "a=<result:a>" in desc
assert "ctx=<Context>" in desc
assert "flag=<default>" in desc
def test_var_positional(self) -> None:
"""*args 参数应显示为 *args."""
def test_describe_injection_var_keyword() -> None:
"""**kwargs 参数应显示为 **kwargs=<all-deps>。"""
def fn(*args: Any) -> None: # noqa: ARG001
return None
def fn(**kwargs: Any) -> None:
return None
spec = px.TaskSpec("t", fn)
desc = describe_injection(spec)
assert "*args" in desc
spec = px.TaskSpec("t", fn, depends_on=("a",))
desc = describe_injection(spec)
assert "**kwargs=<all-deps>" in desc
def test_var_keyword(self) -> None:
"""**kwargs 参数应显示为 **kwargs=<all-deps>."""
def fn(**kwargs: Any) -> None: # pyright: ignore[reportExplicitAny, reportAny] # noqa: ARG001
return None
def test_describe_injection_unresolved() -> None:
"""无依赖、无静态值、无默认的参数应显示为 <UNRESOLVED>。"""
spec = px.TaskSpec("t", fn, depends_on=("a",))
desc = describe_injection(spec)
assert "**kwargs=<all-deps>" in desc
def fn(missing: int) -> None:
return None
def test_unresolved(self) -> None:
"""无依赖、无静态值、无默认的参数应显示为 <UNRESOLVED>."""
spec = px.TaskSpec("t", fn)
desc = describe_injection(spec)
assert "missing=<UNRESOLVED>" in desc
def fn(missing: int) -> None: # noqa: ARG001
return None
spec = px.TaskSpec("t", fn)
desc = describe_injection(spec)
assert "missing=<UNRESOLVED>" in desc
def test_describe_injection_static_kwargs() -> None:
"""静态 kwargs 应显示具体值"""
def test_static_kwargs(self) -> None:
"""静态 kwargs 应显示具体值."""
def fn(flag: bool = False) -> None:
return None
def fn(flag: bool = False) -> None: # noqa: ARG001
return None
spec = px.TaskSpec("t", fn, kwargs={"flag": True})
desc = describe_injection(spec)
assert "flag=True" in desc
spec = px.TaskSpec("t", fn, kwargs={"flag": True})
desc = describe_injection(spec)
assert "flag=True" in desc
def test_positional_args_filled(self) -> None:
"""spec.args 填充的位置参数应显示具体值(覆盖 args_filled 分支)."""
def test_describe_injection_positional_args_filled() -> None:
"""spec.args 填充的位置参数应显示具体值(覆盖 args_filled 分支)。"""
def fn(a: int, b: str) -> None: # noqa: ARG001
return None
def fn(a: int, b: str) -> None:
return None
spec = px.TaskSpec("t", fn, args=(1, "x"))
desc = describe_injection(spec)
assert "a=1" in desc
assert "b='x'" in desc
spec = px.TaskSpec("t", fn, args=(1, "x"))
desc = describe_injection(spec)
assert "a=1" in desc
assert "b='x'" in desc
class TestIsContextAnnotation:
"""测试 _is_context_annotation 函数."""
# ---------------------------------------------------------------------- #
# build_call_args 边界
# ---------------------------------------------------------------------- #
def test_build_call_args_var_positional_not_required() -> None:
"""*args 参数不应触发 InjectionError。"""
def test_direct_object(self) -> None:
"""直接传入 Context 别名对象应返回 True."""
assert _is_context_annotation(px.Context) is True
def fn(*args: Any) -> int:
return len(args)
def test_string(self) -> None:
"""字符串形式的注解应被识别."""
assert _is_context_annotation("Context") is True
assert _is_context_annotation("px.Context") is True
assert _is_context_annotation("pyflowx.Context") is True
assert _is_context_annotation("NotContext") is False
assert _is_context_annotation("int") is False
spec = px.TaskSpec("t", fn, args=(1, 2, 3))
args, kwargs = build_call_args(spec, {})
assert args == (1, 2, 3)
assert kwargs == {}
def test_typing_alias(self) -> None:
"""具有 __name__/_name 为 Context/Mapping 的 typing 别名应返回 True."""
class FakeAlias:
__name__ = "Context"
def test_build_call_args_var_keyword_consumes_leftover() -> None:
"""**kwargs 应吞掉未被具名参数消费的依赖结果。"""
assert _is_context_annotation(FakeAlias()) is True
def fn(a: int, **rest: Any) -> int:
return a + sum(rest.values())
class FakeMapping:
__name__ = "Mapping"
spec = px.TaskSpec("t", fn, depends_on=("a", "b", "c"))
args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 3})
assert kwargs == {"a": 1, "b": 2, "c": 3}
assert _is_context_annotation(FakeMapping()) is True
def test_build_call_args_no_var_keyword_drops_leftover() -> None:
"""无 **kwargs 时,未被消费的依赖结果被丢弃(不报错)。"""
def fn(a: int) -> int:
return a
spec = px.TaskSpec("t", fn, depends_on=("a", "b"))
# b 是依赖但 fn 不接收它 —— 应正常工作
args, kwargs = build_call_args(spec, {"a": 1, "b": 2})
assert kwargs == {"a": 1}
def test_build_call_args_context_annotation_only_deps() -> None:
"""Context 标注只接收该任务自身 depends_on 的结果。"""
def fn(ctx: px.Context) -> int:
return len(ctx)
spec = px.TaskSpec("t", fn, depends_on=("a", "b"))
args, kwargs = build_call_args(spec, {"a": 1, "b": 2, "c": 99})
assert kwargs == {"ctx": {"a": 1, "b": 2}}
def test_other(self) -> None:
"""其他类型注解应返回 False."""
assert _is_context_annotation(int) is False
assert _is_context_annotation(str) is False
assert _is_context_annotation(None) is False
+12 -12
View File
@@ -3,11 +3,11 @@
from __future__ import annotations
import asyncio
import os
import tempfile
import threading
import time
from typing import Any, List
from pathlib import Path
from typing import Any
import pytest
@@ -39,7 +39,7 @@ def test_sequential_basic() -> None:
def test_sequential_diamond() -> None:
order: List[str] = []
order: list[str] = []
def make(name: str) -> Any:
def fn() -> str:
@@ -66,7 +66,7 @@ def test_failure_propagates() -> None:
def boom() -> None:
raise ValueError("kaboom")
def downstream(boom: None) -> int:
def downstream(_boom: None) -> int:
return 1
graph = px.Graph.from_specs(
@@ -131,7 +131,7 @@ def test_threaded_parallelism() -> None:
def test_threaded_layer_barrier() -> None:
finished: List[str] = []
finished: list[str] = []
lock = threading.Lock()
def make(name: str) -> Any:
@@ -231,7 +231,7 @@ def test_async_timeout() -> None:
# Dry run
# ---------------------------------------------------------------------- #
def test_dry_run_does_not_execute(capsys: pytest.CaptureFixture[str]) -> None:
called: List[str] = []
called: list[str] = []
def fn() -> str:
called.append("x")
@@ -250,7 +250,7 @@ def test_dry_run_does_not_execute(capsys: pytest.CaptureFixture[str]) -> None:
# State / resume
# ---------------------------------------------------------------------- #
def test_memory_backend_resume() -> None:
runs: List[str] = []
runs: list[str] = []
def make(name: str) -> Any:
def fn() -> str:
@@ -276,7 +276,7 @@ def test_memory_backend_resume() -> None:
def test_json_backend_persistence() -> None:
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
def fn() -> int:
return 7
@@ -285,7 +285,7 @@ def test_json_backend_persistence() -> None:
px.run(graph, strategy="sequential", state=JSONBackend(path))
# New backend reads the file; task should be skipped.
runs: List[str] = []
runs: list[str] = []
def fn2() -> int:
runs.append("ran")
@@ -301,7 +301,7 @@ def test_json_backend_persistence() -> None:
# Events
# ---------------------------------------------------------------------- #
def test_on_event_callback() -> None:
events: List[px.TaskEvent] = []
events: list[px.TaskEvent] = []
def fn() -> int:
return 1
@@ -390,7 +390,7 @@ def test_async_failure_retry_branch(caplog: pytest.LogCaptureFixture) -> None:
# ---------------------------------------------------------------------- #
def test_threaded_skips_cached_tasks() -> None:
"""threaded 策略下命中缓存的任务应被跳过(覆盖 line 224-230)。"""
runs: List[str] = []
runs: list[str] = []
def make(name: str) -> Any:
def fn() -> str:
@@ -426,7 +426,7 @@ def test_threaded_all_cached_layer() -> None:
def test_async_skips_cached_tasks() -> None:
"""async 策略下命中缓存的任务应被跳过(覆盖 line 268-274)。"""
runs: List[str] = []
runs: list[str] = []
async def make(name: str) -> Any:
async def fn() -> str:
+12 -11
View File
@@ -39,7 +39,7 @@ def test_from_specs_allows_forward_references() -> None:
def test_duplicate_task_raises() -> None:
with pytest.raises(DuplicateTaskError):
px.Graph.from_specs(
_ = px.Graph.from_specs(
[
px.TaskSpec("a", _fn),
px.TaskSpec("a", _fn),
@@ -49,14 +49,15 @@ def test_duplicate_task_raises() -> None:
def test_missing_dependency_raises() -> None:
with pytest.raises(MissingDependencyError) as exc_info:
px.Graph.from_specs([px.TaskSpec("b", _fn, depends_on=("a",))])
_ = px.Graph.from_specs([px.TaskSpec("b", _fn, depends_on=("a",))])
assert exc_info.value.task == "b"
assert exc_info.value.dependency == "a"
def test_cycle_detection() -> None:
with pytest.raises(CycleError):
px.Graph.from_specs(
_ = px.Graph.from_specs(
[
px.TaskSpec("a", _fn, depends_on=("c",)),
px.TaskSpec("b", _fn, depends_on=("a",)),
@@ -80,7 +81,7 @@ def test_layers_grouping() -> None:
def test_self_dependency_rejected() -> None:
with pytest.raises(ValueError):
px.TaskSpec("a", _fn, depends_on=("a",))
_ = px.TaskSpec("a", _fn, depends_on=("a",))
def test_to_mermaid() -> None:
@@ -99,7 +100,7 @@ def test_to_mermaid() -> None:
def test_to_mermaid_invalid_orientation() -> None:
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
with pytest.raises(ValueError):
graph.to_mermaid("XX")
_ = graph.to_mermaid("XX")
def test_subgraph_by_tags() -> None:
@@ -134,7 +135,7 @@ def test_subgraph_by_names() -> None:
def test_subgraph_by_names_unknown() -> None:
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
with pytest.raises(KeyError):
graph.subgraph_by_names(["nope"])
_ = graph.subgraph_by_names(["nope"])
def test_describe() -> None:
@@ -160,14 +161,14 @@ def test_add_chains_and_validates() -> None:
assert "a" in graph
# 缺失依赖应即时报错
with pytest.raises(MissingDependencyError):
graph.add(px.TaskSpec("b", _fn, depends_on=("missing",)))
_ = graph.add(px.TaskSpec("b", _fn, depends_on=("missing",)))
def test_add_duplicate_raises() -> None:
graph = px.Graph()
graph.add(px.TaskSpec("a", _fn))
_ = graph.add(px.TaskSpec("a", _fn))
with pytest.raises(DuplicateTaskError):
graph.add(px.TaskSpec("a", _fn))
_ = graph.add(px.TaskSpec("a", _fn))
def test_all_specs_returns_view() -> None:
@@ -182,7 +183,7 @@ def test_spec_accessor() -> None:
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
assert graph.spec("a").name == "a"
with pytest.raises(KeyError):
graph.spec("missing")
_ = graph.spec("missing")
def test_dependencies_accessor() -> None:
@@ -213,7 +214,7 @@ def test_subgraph_preserves_metadata() -> None:
graph = px.Graph.from_specs(
[
px.TaskSpec("a", _fn, tags=("x",), retries=3, timeout=5.0),
px.TaskSpec("b", _fn, ("a",), tags=("y",)),
px.TaskSpec("b", _fn, depends_on=("a",), tags=("y",)),
]
)
sub = graph.subgraph(["x"])
+90 -81
View File
@@ -1,9 +1,8 @@
"""RunReport 测试"""
"""RunReport 测试."""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from datetime import datetime, timedelta
import pyflowx as px
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
@@ -17,15 +16,14 @@ def _make_result(
name: str = "a",
status: TaskStatus = TaskStatus.SUCCESS,
value: object = 42,
error: Optional[object] = None,
error: BaseException | None = None,
duration: float = 0.5,
attempts: int = 1,
) -> TaskResult[object]:
"""构造测试用 TaskResult 实例."""
spec: TaskSpec[object] = TaskSpec[object](name, _fn)
start = datetime(2024, 1, 1, 0, 0, 0)
# 用 timedelta 精确表达秒数,避免 int() 截断小数
from datetime import timedelta
end = start + timedelta(seconds=duration) if duration else None
return TaskResult[object](
spec=spec,
@@ -38,85 +36,96 @@ def _make_result(
)
def test_getitem_returns_value() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a", value=7)
assert report["a"] == 7
class TestRunReportAccess:
"""测试 RunReport 的访问接口."""
def test_getitem_returns_value(self) -> None:
"""report[name] 应返回任务结果值."""
report = px.RunReport()
report.results["a"] = _make_result("a", value=7)
assert report["a"] == 7
def test_result_of_returns_full_result(self) -> None:
"""result_of 应返回完整的 TaskResult 对象."""
report = px.RunReport()
r = _make_result("a")
report.results["a"] = r
assert report.result_of("a") is r
def test_contains(self) -> None:
"""in 运算符应正确判断任务是否存在."""
report = px.RunReport()
report.results["a"] = _make_result("a")
assert "a" in report
assert "b" not in report
def test_iter_and_len(self) -> None:
"""应支持迭代任务名并返回任务数量."""
report = px.RunReport()
report.results["a"] = _make_result("a")
report.results["b"] = _make_result("b")
assert list(report) == ["a", "b"]
assert len(report) == 2
def test_result_of_returns_full_result() -> None:
report = px.RunReport()
r = _make_result("a")
report.results["a"] = r
assert report.result_of("a") is r
class TestRunReportSummary:
"""测试 RunReport 的 summary 方法."""
def test_summary_success(self) -> None:
"""应正确汇总成功和跳过的任务."""
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS, duration=1.0)
report.results["b"] = _make_result("b", status=TaskStatus.SKIPPED, duration=0.0)
s = report.summary()
assert s["success"] is True
assert s["total_tasks"] == 2
assert s["by_status"] == {"success": 1, "skipped": 1}
assert s["total_duration_seconds"] == 1.0
def test_summary_with_none_duration(self) -> None:
"""未开始/未结束的任务 duration 为 None,不应计入总时长."""
report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.FAILED)
s = report.summary()
assert s["total_duration_seconds"] == 0.0
def test_failed_tasks(self) -> None:
"""failed_tasks 应返回所有失败任务名."""
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS)
report.results["b"] = _make_result(
"b", status=TaskStatus.FAILED, error=ValueError("x")
)
assert report.failed_tasks() == ["b"]
def test_contains() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a")
assert "a" in report
assert "b" not in report
class TestRunReportDescribe:
"""测试 RunReport 的 describe 方法."""
def test_describe_success(self) -> None:
"""应正确描述成功状态和耗时."""
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS, duration=0.5)
desc = report.describe()
assert "RunReport(success=True)" in desc
assert "a: success" in desc
assert "0.500s" in desc
def test_iter_and_len() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a")
report.results["b"] = _make_result("b")
assert list(report) == ["a", "b"]
assert len(report) == 2
def test_describe_with_error(self) -> None:
"""应正确描述失败状态和错误信息."""
report = px.RunReport(success=False)
report.results["a"] = _make_result(
"a", status=TaskStatus.FAILED, error=ValueError("boom"), duration=0.1
)
desc = report.describe()
assert "success=False" in desc
assert "error=ValueError" in desc
def test_summary_success() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS, duration=1.0)
report.results["b"] = _make_result("b", status=TaskStatus.SKIPPED, duration=0.0)
s = report.summary()
assert s["success"] is True
assert s["total_tasks"] == 2
assert s["by_status"] == {"success": 1, "skipped": 1}
assert s["total_duration_seconds"] == 1.0
def test_summary_with_none_duration() -> None:
"""未开始/未结束的任务 duration 为 None,不应计入总时长。"""
report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.FAILED)
s = report.summary()
assert s["total_duration_seconds"] == 0.0
def test_failed_tasks() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS)
report.results["b"] = _make_result(
"b", status=TaskStatus.FAILED, error=ValueError("x")
)
assert report.failed_tasks() == ["b"]
def test_describe_success() -> None:
report = px.RunReport()
report.results["a"] = _make_result("a", status=TaskStatus.SUCCESS, duration=0.5)
desc = report.describe()
assert "RunReport(success=True)" in desc
assert "a: success" in desc
assert "0.500s" in desc
def test_describe_with_error() -> None:
report = px.RunReport(success=False)
report.results["a"] = _make_result(
"a", status=TaskStatus.FAILED, error=ValueError("boom"), duration=0.1
)
desc = report.describe()
assert "success=False" in desc
assert "error=ValueError" in desc
def test_describe_no_duration() -> None:
report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.PENDING)
desc = report.describe()
assert "-" in desc # duration 显示为 "-"
def test_describe_no_duration(self) -> None:
"""无耗时的任务应显示为 '-'."""
report = px.RunReport()
spec: TaskSpec[object] = TaskSpec("a", _fn) # type: ignore[arg-type]
report.results["a"] = TaskResult(spec=spec, status=TaskStatus.PENDING)
desc = report.describe()
assert "-" in desc # duration 显示为 "-"
+18 -25
View File
@@ -3,7 +3,7 @@
from __future__ import annotations
import sys
from typing import Any, List
from typing import Any
from unittest.mock import patch
import pytest
@@ -77,12 +77,12 @@ class TestCliRunnerConstruction:
def test_rejects_non_graph_value(self) -> None:
"""非 Graph 值应抛出 TypeError."""
with pytest.raises(TypeError, match="必须是 Graph 实例"):
px.CliRunner(clean="not a graph") # type: ignore[arg-type]
_ = px.CliRunner(clean="not a graph") # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
def test_rejects_non_graph_list(self) -> None:
"""列表类型的值应抛出 TypeError."""
with pytest.raises(TypeError, match="必须是 Graph 实例"):
px.CliRunner(build=[1, 2, 3]) # type: ignore[arg-type]
_ = px.CliRunner(build=[1, 2, 3]) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
def test_default_strategy_is_sequential(self) -> None:
"""默认策略应为 Strategy.SEQUENTIAL."""
@@ -257,19 +257,15 @@ class TestCliRunnerParser:
class TestCliRunnerRunSuccess:
"""测试 CliRunner.run 的成功执行路径."""
def test_run_valid_command_returns_zero(
self, capsys: pytest.CaptureFixture[str]
) -> None:
def test_run_valid_command_returns_zero(self) -> None:
"""有效命令执行成功应返回 0."""
runner = px.CliRunner(echo=_echo_graph())
exit_code = runner.run(["echo"])
assert exit_code == CliExitCode.SUCCESS.value
def test_run_executes_correct_graph(
self, capsys: pytest.CaptureFixture[str]
) -> None:
def test_run_executes_correct_graph(self) -> None:
"""应执行用户指定的命令对应的图."""
executed: List[str] = []
executed: list[str] = []
def track_a() -> None:
executed.append("a")
@@ -418,9 +414,7 @@ class TestCliRunnerRunFailure:
captured = capsys.readouterr()
assert "可用命令" in captured.out or "可用命令" in captured.err
def test_run_failing_task_returns_failure(
self, capsys: pytest.CaptureFixture[str]
) -> None:
def test_run_failing_task_returns_failure(self) -> None:
"""任务失败时应返回 1."""
runner = px.CliRunner(fail=_failing_graph())
exit_code = runner.run(["fail"])
@@ -443,7 +437,7 @@ class TestCliRunnerRunFailure:
class TestCliRunnerList:
"""测试 --list 选项."""
def test_list_returns_success(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_list_returns_success(self) -> None:
"""--list 应返回 0."""
runner = px.CliRunner(clean=_echo_graph(), build=_echo_graph())
exit_code = runner.run(["--list"])
@@ -462,11 +456,9 @@ class TestCliRunnerList:
assert "build" in captured.out
assert "test" in captured.out
def test_list_does_not_execute_any_graph(
self, capsys: pytest.CaptureFixture[str]
) -> None:
def test_list_does_not_execute_any_graph(self) -> None:
"""--list 不应执行任何图."""
executed: List[str] = []
executed: list[str] = []
def track() -> None:
executed.append("ran")
@@ -488,7 +480,7 @@ class TestCliRunnerErrorHandling:
"""KeyboardInterrupt 应返回 130."""
runner = px.CliRunner(echo=_echo_graph())
def raise_interrupt(*args: Any, **kwargs: Any) -> None:
def raise_interrupt(*_args: Any, **_kwargs: Any) -> None:
raise KeyboardInterrupt
with patch("pyflowx.runner.run", side_effect=raise_interrupt):
@@ -503,7 +495,7 @@ class TestCliRunnerErrorHandling:
"""PyFlowXError 应返回 1."""
runner = px.CliRunner(echo=_echo_graph())
def raise_error(*args: Any, **kwargs: Any) -> None:
def raise_error(*_args: Any, **_kwargs: Any) -> None:
raise TaskFailedError("echo", RuntimeError("boom"), 1)
with patch("pyflowx.runner.run", side_effect=raise_error):
@@ -520,12 +512,13 @@ class TestCliRunnerErrorHandling:
runner = px.CliRunner(echo=_echo_graph())
def raise_custom(*args: Any, **kwargs: Any) -> None:
def raise_custom(*_args: Any, **_kwargs: Any) -> None:
raise CustomError("unexpected")
with patch("pyflowx.runner.run", side_effect=raise_custom):
with pytest.raises(CustomError):
runner.run(["echo"])
with patch("pyflowx.runner.run", side_effect=raise_custom), pytest.raises(
CustomError
):
runner.run(["echo"])
# ---------------------------------------------------------------------- #
@@ -617,7 +610,7 @@ class TestCliRunnerIntegration:
def test_diamond_dependency_graph(self) -> None:
"""菱形依赖图应正确执行."""
order: List[str] = []
order: list[str] = []
def make(name: str) -> Any:
def fn() -> str:
+13 -26
View File
@@ -22,19 +22,6 @@ def mock_tmp_json(tmp_path: Path) -> Path:
return path
class TestStateBackend:
"""测试状态后端。"""
def test_json_backend_save_and_load(self, mock_tmp_json: Path) -> None:
"""测试 JSON 后端保存和加载。"""
b = JSONBackend(str(mock_tmp_json))
assert not b.has("a")
b.save("a", 1)
assert b.has("a")
assert b.get("a") == 1
assert dict(b.load()) == {"a": 1}
# ---------------------------------------------------------------------- #
# MemoryBackend
# ---------------------------------------------------------------------- #
@@ -61,7 +48,7 @@ def test_memory_backend_get_missing_raises() -> None:
# ---------------------------------------------------------------------- #
def test_json_backend_save_and_load() -> None:
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
b = JSONBackend(path)
b.save("a", {"x": 1})
b.save("b", [1, 2, 3])
@@ -75,7 +62,7 @@ def test_json_backend_save_and_load() -> None:
def test_json_backend_clear() -> None:
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
b = JSONBackend(path)
b.save("a", 1)
b.clear()
@@ -88,7 +75,7 @@ def test_json_backend_clear() -> None:
def test_json_backend_nonexistent_file_starts_empty() -> None:
"""文件不存在时应正常初始化为空。"""
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "absent.json")
path = str(Path(tmp) / "absent.json")
b = JSONBackend(path)
assert dict(b.load()) == {}
assert not b.has("anything")
@@ -97,7 +84,7 @@ def test_json_backend_nonexistent_file_starts_empty() -> None:
def test_json_backend_non_serialisable_raises() -> None:
"""不可 JSON 序列化的值应抛 StorageError,且不污染内存状态。"""
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
b = JSONBackend(path)
with pytest.raises(StorageError):
b.save("a", object()) # object() 不可序列化
@@ -113,12 +100,12 @@ def test_json_backend_flush_type_error(monkeypatch: pytest.MonkeyPatch) -> None:
import json as _json
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
b = JSONBackend(path)
original_dump = _json.dump
def flaky_dump(*args: Any, **kwargs: Any) -> None:
def flaky_dump(*_args: Any, **_kwargs: Any) -> None:
raise TypeError("simulated flush failure")
monkeypatch.setattr(_json, "dump", flaky_dump)
@@ -131,28 +118,28 @@ def test_json_backend_flush_type_error(monkeypatch: pytest.MonkeyPatch) -> None:
def test_json_backend_flush_os_error(monkeypatch: pytest.MonkeyPatch) -> None:
"""_flush 时 OSError 应转为 StorageError。"""
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
b = JSONBackend(path)
original_replace = os.replace
def fail_replace(*args: Any, **kwargs: Any) -> None:
def fail_replace(*_args: Any, **_kwargs: Any) -> None:
raise OSError("simulated os.replace failure")
monkeypatch.setattr(os, "replace", fail_replace)
monkeypatch.setattr(Path, "replace", fail_replace)
with pytest.raises(StorageError, match="cannot write"):
b.save("a", 1)
monkeypatch.setattr(os, "replace", original_replace)
def test_json_backend_corrupt_file_raises(tmp_path: Path) -> None:
def test_json_backend_corrupt_file_raises() -> None:
"""损坏的 JSON 文件应抛 StorageError。"""
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "state.json")
path = str(Path(tmp) / "state.json")
with open(path, "w", encoding="utf-8") as fh:
fh.write("{not valid json")
_ = fh.write("{not valid json")
with pytest.raises(StorageError):
JSONBackend(path)
_ = JSONBackend(path)
def test_json_backend_non_dict_content_ignored(tmp_path: Path) -> None:
+1 -1
View File
@@ -356,7 +356,7 @@ class TestTaskSpecVerbose:
def test_verbose_default_is_false(self) -> None:
"""verbose 默认应为 False."""
spec = px.TaskSpec("a", cmd=[*ECHO_CMD, "hi"])
spec: px.TaskSpec[object] = px.TaskSpec("a", cmd=[*ECHO_CMD, "hi"])
assert spec.verbose is False
def test_verbose_true_prints_command(