83da5135d0
- add test_all_deps_combines_hard_and_soft to verify all_deps returns correct hard+soft deps in order - add multiple tests for GraphDefaults field inheritance, including normal inheritance and non-override of custom values
342 lines
12 KiB
Python
342 lines
12 KiB
Python
"""Tests for Graph construction, validation, layering and subgraphs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
import pyflowx as px
|
|
from pyflowx.errors import CycleError, DuplicateTaskError, MissingDependencyError
|
|
from pyflowx.graph import GraphComposer, compose
|
|
|
|
|
|
def _fn() -> None:
|
|
return None
|
|
|
|
|
|
def test_from_specs_builds_graph() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
px.TaskSpec("c", _fn, depends_on=("a", "b")),
|
|
])
|
|
assert set(graph.names) == {"a", "b", "c"}
|
|
assert graph.dependencies("c") == ("a", "b")
|
|
assert len(graph) == 3
|
|
assert "a" in graph
|
|
|
|
|
|
def test_from_specs_allows_forward_references() -> None:
|
|
# b depends on a, but a is declared after b — order should not matter.
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
px.TaskSpec("a", _fn),
|
|
])
|
|
assert graph.layers() == [["a"], ["b"]]
|
|
|
|
|
|
def test_duplicate_task_raises() -> None:
|
|
with pytest.raises(DuplicateTaskError):
|
|
_ = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("a", _fn),
|
|
])
|
|
|
|
|
|
def test_missing_dependency_raises() -> None:
|
|
with pytest.raises(MissingDependencyError) as exc_info:
|
|
_ = px.Graph.from_specs([px.TaskSpec("b", _fn, depends_on=("a",))])
|
|
|
|
assert exc_info.value.task == "b"
|
|
assert exc_info.value.dependency == "a"
|
|
|
|
|
|
def test_cycle_detection() -> None:
|
|
with pytest.raises(CycleError):
|
|
_ = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn, depends_on=("c",)),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
px.TaskSpec("c", _fn, depends_on=("b",)),
|
|
])
|
|
|
|
|
|
def test_layers_grouping() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn),
|
|
px.TaskSpec("c", _fn, depends_on=("a", "b")),
|
|
px.TaskSpec("d", _fn, depends_on=("c",)),
|
|
])
|
|
layers = graph.layers()
|
|
assert layers == [["a", "b"], ["c"], ["d"]]
|
|
|
|
|
|
def test_self_dependency_rejected() -> None:
|
|
with pytest.raises(ValueError):
|
|
_ = px.TaskSpec("a", _fn, depends_on=("a",))
|
|
|
|
|
|
def test_to_mermaid() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
])
|
|
mermaid = graph.to_mermaid()
|
|
assert mermaid.startswith("graph TD")
|
|
assert 'a["a"]' in mermaid
|
|
assert "a --> b" in mermaid
|
|
|
|
|
|
def test_to_mermaid_invalid_orientation() -> None:
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
with pytest.raises(ValueError):
|
|
_ = graph.to_mermaid("XX")
|
|
|
|
|
|
def test_subgraph_by_tags() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn, tags=("ingest",)),
|
|
px.TaskSpec("b", _fn, depends_on=("a",), tags=("ingest",)),
|
|
px.TaskSpec("c", _fn, depends_on=("b",), tags=("report",)),
|
|
])
|
|
sub = graph.subgraph(["ingest"])
|
|
assert set(sub.names) == {"a", "b"}
|
|
# Edge to dropped task c is removed; b no longer waits for anything
|
|
# outside the subgraph (c was never a dep of b anyway).
|
|
assert sub.dependencies("b") == ("a",)
|
|
|
|
|
|
def test_subgraph_by_names() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
px.TaskSpec("c", _fn, depends_on=("b",)),
|
|
])
|
|
sub = graph.subgraph_by_names(["a", "b"])
|
|
assert set(sub.names) == {"a", "b"}
|
|
# c is dropped, so b's dep on c (none here) — but a->b edge preserved.
|
|
assert sub.dependencies("b") == ("a",)
|
|
|
|
|
|
def test_subgraph_by_names_unknown() -> None:
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
with pytest.raises(KeyError):
|
|
_ = graph.subgraph_by_names(["nope"])
|
|
|
|
|
|
def test_describe() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
])
|
|
desc = graph.describe()
|
|
assert "Layer 1" in desc
|
|
assert "Layer 2" in desc
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# 增量 add API 与其他访问器
|
|
# ---------------------------------------------------------------------- #
|
|
def test_add_chains_and_validates() -> None:
|
|
"""add() 应返回 self 以支持链式调用,并即时校验。"""
|
|
graph = px.Graph()
|
|
ret = graph.add(px.TaskSpec("a", _fn))
|
|
assert ret is graph
|
|
assert "a" in graph
|
|
# 缺失依赖应即时报错
|
|
with pytest.raises(MissingDependencyError):
|
|
_ = graph.add(px.TaskSpec("b", _fn, depends_on=("missing",)))
|
|
|
|
|
|
def test_add_duplicate_raises() -> None:
|
|
graph = px.Graph()
|
|
_ = graph.add(px.TaskSpec("a", _fn))
|
|
with pytest.raises(DuplicateTaskError):
|
|
_ = graph.add(px.TaskSpec("a", _fn))
|
|
|
|
|
|
def test_all_specs_returns_view() -> None:
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
view = graph.all_specs()
|
|
assert set(view.keys()) == {"a"}
|
|
# 返回的是只读视图,修改不影响内部
|
|
assert view is graph.all_specs() or view == graph.all_specs()
|
|
|
|
|
|
def test_all_deps_combines_hard_and_soft() -> None:
|
|
"""all_deps 应返回硬依赖 + 软依赖的组合。"""
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn),
|
|
px.TaskSpec("c", _fn, depends_on=("a",), soft_depends_on=("b",)),
|
|
])
|
|
all_deps = graph.all_deps("c")
|
|
assert set(all_deps) == {"a", "b"}
|
|
# 硬依赖在前,软依赖在后
|
|
assert all_deps == ("a", "b")
|
|
|
|
|
|
def test_spec_accessor() -> None:
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
assert graph.spec("a").name == "a"
|
|
with pytest.raises(KeyError):
|
|
_ = graph.spec("missing")
|
|
|
|
|
|
def test_dependencies_accessor() -> None:
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, depends_on=("a",)),
|
|
])
|
|
assert graph.dependencies("a") == ()
|
|
assert graph.dependencies("b") == ("a",)
|
|
|
|
|
|
def test_repr() -> None:
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
assert repr(graph) == "Graph(tasks=1)"
|
|
|
|
|
|
def test_empty_graph_layers() -> None:
|
|
"""空图的 layers() 应返回空列表。"""
|
|
graph = px.Graph()
|
|
assert graph.layers() == []
|
|
assert graph.to_mermaid() == "graph TD\n"
|
|
|
|
|
|
def test_subgraph_preserves_metadata() -> None:
|
|
"""子图应保留原任务的 retry/timeout/tags 等元数据。"""
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec(
|
|
"a",
|
|
_fn,
|
|
tags=("x",),
|
|
retry=px.RetryPolicy(max_attempts=3),
|
|
timeout=5.0,
|
|
),
|
|
px.TaskSpec("b", _fn, depends_on=("a",), tags=("y",)),
|
|
])
|
|
sub = graph.subgraph(["x"])
|
|
spec = sub.spec("a")
|
|
assert spec.retry.max_attempts == 3
|
|
assert spec.timeout == 5.0
|
|
assert spec.tags == ("x",)
|
|
|
|
|
|
def test_subgraph_by_tags_no_match() -> None:
|
|
"""无匹配 tag 时返回空图。"""
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn, tags=("x",))])
|
|
sub = graph.subgraph(["z"])
|
|
assert len(sub) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# from_specs str 类型分支测试
|
|
# ---------------------------------------------------------------------- #
|
|
def test_from_specs_with_string_ref() -> None:
|
|
"""from_specs 接受字符串引用并收集到 pending_refs."""
|
|
# 字符串引用被收集到 _pending_refs,而非尝试打开文件
|
|
graph = px.Graph.from_specs(["ref_cmd"])
|
|
assert graph._pending_refs == ["ref_cmd"]
|
|
|
|
|
|
def test_from_specs_with_invalid_type() -> None:
|
|
"""from_specs 接受不支持的类型时应抛 TypeError."""
|
|
with pytest.raises(TypeError, match="from_specs 只接受 TaskSpec 或 str"):
|
|
_ = px.Graph.from_specs([123]) # type: ignore[list-item]
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# to_mermaid 软依赖测试
|
|
# ---------------------------------------------------------------------- #
|
|
def test_to_mermaid_soft_depends_on() -> None:
|
|
"""to_mermaid 应正确绘制软依赖为虚线."""
|
|
graph = px.Graph.from_specs([
|
|
px.TaskSpec("a", _fn),
|
|
px.TaskSpec("b", _fn, soft_depends_on=("a",)),
|
|
])
|
|
mermaid = graph.to_mermaid()
|
|
assert "a -.-> b" in mermaid # 软依赖用虚线
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# GraphComposer 与 compose 测试
|
|
# ---------------------------------------------------------------------- #
|
|
def test_graph_composer_resolve_all() -> None:
|
|
"""GraphComposer.resolve_all 应展开所有图的字符串引用."""
|
|
graph_a = px.Graph.from_specs([px.TaskSpec("a1", _fn), px.TaskSpec("a2", _fn, depends_on=("a1",))])
|
|
# 创建带 _pending_refs 的图
|
|
graph_b = px.Graph.from_specs([px.TaskSpec("b1", _fn)])
|
|
graph_b._pending_refs = ["cmd_a"] # 手动设置内部属性
|
|
|
|
composer = GraphComposer({"cmd_a": graph_a, "cmd_b": graph_b})
|
|
resolved = composer.resolve_all()
|
|
|
|
# graph_b 应包含 graph_a 的任务
|
|
assert "a1" in resolved["cmd_b"]
|
|
assert "a2" in resolved["cmd_b"]
|
|
|
|
|
|
def test_graph_composer_parse_ref_self_reference() -> None:
|
|
"""GraphComposer.parse_ref 应检测循环引用."""
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
composer = GraphComposer({"cmd": graph})
|
|
with pytest.raises(ValueError, match="循环引用"):
|
|
_ = composer.parse_ref("cmd", "cmd")
|
|
|
|
|
|
def test_graph_composer_parse_ref_cmd_not_found() -> None:
|
|
"""GraphComposer.parse_ref 应检测引用的命令不存在."""
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
composer = GraphComposer({"cmd": graph})
|
|
with pytest.raises(ValueError, match="引用的命令 'missing' 不存在"):
|
|
_ = composer.parse_ref("missing", "current")
|
|
|
|
|
|
def test_graph_composer_parse_ref_task_not_found() -> None:
|
|
"""GraphComposer.parse_ref 应检测任务不存在于引用的命令中."""
|
|
graph_a = px.Graph.from_specs([px.TaskSpec("a1", _fn)])
|
|
graph_b = px.Graph.from_specs([px.TaskSpec("b1", _fn)])
|
|
composer = GraphComposer({"cmd_a": graph_a, "cmd_b": graph_b})
|
|
with pytest.raises(ValueError, match="任务 'missing' 不存在于命令 'cmd_a' 中"):
|
|
_ = composer.parse_ref("cmd_a.missing", "cmd_b")
|
|
|
|
|
|
def test_graph_composer_expand_refs_no_pending() -> None:
|
|
"""GraphComposer.expand_refs 无 pending_refs 时应原样返回."""
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)])
|
|
composer = GraphComposer({"cmd": graph})
|
|
expanded = composer.expand_refs(graph, "cmd")
|
|
assert expanded is graph
|
|
|
|
|
|
def test_compose_function() -> None:
|
|
"""compose() 函数应等同于 GraphComposer().resolve_all()。"""
|
|
graph_a = px.Graph.from_specs([px.TaskSpec("a1", _fn)])
|
|
graph_b = px.Graph.from_specs([px.TaskSpec("b1", _fn)])
|
|
graph_b._pending_refs = ["cmd_a"] # 手动设置内部属性
|
|
|
|
resolved = compose({"cmd_a": graph_a, "cmd_b": graph_b})
|
|
assert "a1" in resolved["cmd_b"]
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# resolved_spec defaults 测试
|
|
# ---------------------------------------------------------------------- #
|
|
def test_resolved_spec_applies_defaults() -> None:
|
|
"""resolved_spec 应应用 Graph.defaults。"""
|
|
defaults = px.GraphDefaults(timeout=10.0, retry=px.RetryPolicy(max_attempts=2))
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn)], defaults=defaults)
|
|
|
|
resolved = graph.resolved_spec("a")
|
|
assert resolved.timeout == 10.0
|
|
assert resolved.retry.max_attempts == 2
|
|
|
|
|
|
def test_resolved_spec_no_override() -> None:
|
|
"""resolved_spec 不应覆盖任务已有的设置。"""
|
|
defaults = px.GraphDefaults(timeout=10.0)
|
|
graph = px.Graph.from_specs([px.TaskSpec("a", _fn, timeout=5.0)], defaults=defaults)
|
|
|
|
resolved = graph.resolved_spec("a")
|
|
assert resolved.timeout == 5.0 # 保持原值,不被 defaults 覆盖
|