2 Commits

Author SHA1 Message Date
zhou 0afdb54e5c ~
Release / Pre-release Check (push) Failing after 1m31s
Release / Build Artifacts (push) Has been skipped
Release / Publish to PyPI (push) Has been skipped
Release / Publish Release (push) Has been skipped
2026-06-25 12:49:26 +08:00
zhou 9e99a1f1ba ~
Release / Pre-release Check (push) Failing after 31s
Release / Build Artifacts (push) Has been skipped
Release / Publish to PyPI (push) Has been skipped
Release / Publish Release (push) Has been skipped
2026-06-25 12:35:27 +08:00
8 changed files with 719 additions and 24 deletions
+1 -1
View File
@@ -17,7 +17,7 @@ license = { text = "MIT" }
name = "pyflowx" name = "pyflowx"
readme = "README.md" readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.8"
version = "0.1.8" version = "0.1.12"
[project.scripts] [project.scripts]
autofmt = "pyflowx.cli.autofmt:main" autofmt = "pyflowx.cli.autofmt:main"
+1 -1
View File
@@ -84,7 +84,7 @@ from .runner import CliExitCode, CliRunner
from .storage import JSONBackend, MemoryBackend, StateBackend from .storage import JSONBackend, MemoryBackend, StateBackend
from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus
__version__ = "0.1.8" __version__ = "0.1.12"
__all__ = [ __all__ = [
"IS_LINUX", "IS_LINUX",
+1
View File
@@ -23,6 +23,7 @@ EXCLUDE_DIRS = [
".tox", ".tox",
".pytest_cache", ".pytest_cache",
"node_modules", "node_modules",
".ruff_cache",
] ]
EXCLUDE_CMDS = [arg for d in EXCLUDE_DIRS for arg in ["-e", d]] EXCLUDE_CMDS = [arg for d in EXCLUDE_DIRS for arg in ["-e", d]]
+18 -12
View File
@@ -20,13 +20,15 @@ def maturin_build_cmd() -> list[str]:
""" """
command = ["maturin", "build", "-r"].copy() command = ["maturin", "build", "-r"].copy()
if Constants.IS_WINDOWS: if Constants.IS_WINDOWS:
command.extend([ command.extend(
"--target", [
"x86_64-win7-windows-msvc", "--target",
"-Zbuild-std", "x86_64-win7-windows-msvc",
"-i", "-Zbuild-std",
"python3.8", "-i",
]) "python3.8",
]
)
return command return command
@@ -47,6 +49,7 @@ test_coverage: px.TaskSpec = px.TaskSpec(
ruff_lint: px.TaskSpec = px.TaskSpec("lint", cmd=["ruff", "check", "--fix", "--unsafe-fixes"]) ruff_lint: px.TaskSpec = px.TaskSpec("lint", cmd=["ruff", "check", "--fix", "--unsafe-fixes"])
ruff_format: px.TaskSpec = px.TaskSpec("format", cmd=["ruff", "format", "."], depends_on=("lint",)) ruff_format: px.TaskSpec = px.TaskSpec("format", cmd=["ruff", "format", "."], depends_on=("lint",))
typecheck: px.TaskSpec = px.TaskSpec("pyrefly_check", cmd=["pyrefly", "check", "."]) typecheck: px.TaskSpec = px.TaskSpec("pyrefly_check", cmd=["pyrefly", "check", "."])
git_add_all: px.TaskSpec = px.TaskSpec("git_add_all", cmd=["git", "add", "-A"])
bump: px.TaskSpec = px.TaskSpec("bumpversion", cmd=["bumpversion", "-t"]) bump: px.TaskSpec = px.TaskSpec("bumpversion", cmd=["bumpversion", "-t"])
doc: px.TaskSpec = px.TaskSpec("doc", cmd=["sphinx-build", "-b", "html", "docs", "docs/_build"]) doc: px.TaskSpec = px.TaskSpec("doc", cmd=["sphinx-build", "-b", "html", "docs", "docs/_build"])
git_push: px.TaskSpec = px.TaskSpec("git_push", cmd=["git", "push"]) git_push: px.TaskSpec = px.TaskSpec("git_push", cmd=["git", "push"])
@@ -84,7 +87,10 @@ def main():
📦 发布命令: 📦 发布命令:
pymake pb - 发布到 PyPI (twine + hatch) pymake pb - 发布到 PyPI (twine + hatch)
💡 常用工作流: 版本管理:
pymake bump - 自动升级版本号并提交修改 (清理 + 检查 + 格式化 + git add + bumpversion)
💡 常用工作流:
1. 日常开发: pymake lint && pymake t 1. 日常开发: pymake lint && pymake t
2. 构建发布包: pymake ba 2. 构建发布包: pymake ba
3. 多版本兼容性测试: pymake tox 3. 多版本兼容性测试: pymake tox
@@ -99,26 +105,26 @@ def main():
pymake type # 类型检查 pymake type # 类型检查
""" """
runner = px.CliRunner( runner = px.CliRunner(
strategy="thread", strategy="sequential",
description="PyMake - Python 构建工具", description="PyMake - Python 构建工具",
graphs={ graphs={
# 构建命令 # 构建命令
"b": px.Graph.from_specs([uv_build]), "b": px.Graph.from_specs([uv_build]),
"bc": px.Graph.from_specs([maturin_build]), "bc": px.Graph.from_specs([maturin_build]),
"ba": px.Graph.from_specs([uv_build, maturin_build]), "ba": px.Graph.from_specs(["b", "bc"]),
# 安装命令 # 安装命令
"sync": px.Graph.from_specs([uv_sync]), "sync": px.Graph.from_specs([uv_sync]),
# 清理命令 # 清理命令
"c": px.Graph.from_specs([git_clean]), "c": px.Graph.from_specs([git_clean]),
# 开发工具 # 开发工具
"bump": px.Graph.from_specs([git_clean, typecheck, ruff_lint, ruff_format, bump]), "bump": px.Graph.from_specs(["c", "tc", git_add_all, bump]),
"cov": px.Graph.from_specs([git_clean, test_coverage]), "cov": px.Graph.from_specs([git_clean, test_coverage]),
"doc": px.Graph.from_specs([doc]), "doc": px.Graph.from_specs([doc]),
"lint": px.Graph.from_specs([ruff_lint, ruff_format]), "lint": px.Graph.from_specs([ruff_lint, ruff_format]),
"pb": px.Graph.from_specs([twine_publish, hatch_publish]), "pb": px.Graph.from_specs([twine_publish, hatch_publish]),
"t": px.Graph.from_specs([test]), "t": px.Graph.from_specs([test]),
"tf": px.Graph.from_specs([test_fast]), "tf": px.Graph.from_specs([test_fast]),
"tc": px.Graph.from_specs([typecheck, ruff_lint, ruff_format]), "tc": px.Graph.from_specs([typecheck, "lint"]),
"tox": px.Graph.from_specs([tox]), "tox": px.Graph.from_specs([tox]),
# 发布命令 # 发布命令
"p": px.Graph.from_specs([git_clean, git_push, git_push_tags]), "p": px.Graph.from_specs([git_clean, git_push, git_push_tags]),
+47 -6
View File
@@ -57,18 +57,59 @@ class Graph:
return self return self
@classmethod @classmethod
def from_specs(cls, specs: Iterable[TaskSpec[Any]]) -> Graph: def from_specs(cls, specs: Iterable[TaskSpec[Any] | str]) -> Graph:
"""从可迭代的 task spec 构建图 """从可迭代的 task spec 构建图.
先收集所有 spec,再统一校验。这意味着任务可以引用*后出现*的 先收集所有 spec,再统一校验。这意味着任务可以引用*后出现*的
依赖——顺序无关,就像声明式配置文件的读取方式。 依赖——顺序无关,就像声明式配置文件的读取方式。
支持字符串引用,允许引用其他命令图中的任务。
字符串引用将在CliRunner中解析展开。
Parameters
----------
specs : Iterable[TaskSpec[Any] | str]
TaskSpec对象或字符串引用的列表
Returns
-------
Graph
构建完成的图
Note
-----
字符串引用格式:
- "command_name" - 引用整个命令图
- "command_name.task_name" - 引用特定任务
Examples
--------
>>> graph = Graph.from_specs([
... TaskSpec("build", cmd=["uv", "build"]),
... "test", # 引用test命令图
... ])
""" """
graph = cls() graph = cls()
pending_refs: list[str] = []
for spec in specs: for spec in specs:
if spec.name in graph.specs: if isinstance(spec, str):
raise DuplicateTaskError(spec.name) # 字符串引用,稍后解析
graph.specs[spec.name] = spec pending_refs.append(spec)
graph.deps[spec.name] = spec.depends_on elif isinstance(spec, TaskSpec):
if spec.name in graph.specs:
raise DuplicateTaskError(spec.name)
graph.specs[spec.name] = spec
graph.deps[spec.name] = spec.depends_on
else:
raise TypeError(f"from_specs只接受TaskSpec或str,收到: {type(spec)}")
# 存储待解析的引用
if pending_refs:
# 使用特殊属性存储引用,稍后在CliRunner中解析
# 由于Graph是frozen dataclass,我们需要特殊处理
object.__setattr__(graph, "_pending_refs", pending_refs)
graph._validate_references() graph._validate_references()
graph.validate() graph.validate()
return graph return graph
+150
View File
@@ -114,6 +114,156 @@ class CliRunner:
if not self.graphs: if not self.graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)") raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 解析并展开字符串引用
self._resolve_graph_refs()
def _resolve_graph_refs(self) -> None:
"""解析并展开图中的字符串引用.
支持两种引用格式:
1. "command_name" - 引用整个命令图
2. "command_name.task_name" - 引用特定任务
递归解析所有引用,直到所有图都只包含TaskSpec对象。
"""
resolved_graphs: dict[str, Graph] = {}
for cmd_name, graph in self.graphs.items():
resolved_graph = self._expand_refs(graph, cmd_name)
resolved_graphs[cmd_name] = resolved_graph
# 更新graphs字典
object.__setattr__(self, "graphs", resolved_graphs)
def _expand_refs(self, graph: Graph, current_cmd: str) -> Graph:
"""展开图中的字符串引用.
Parameters
----------
graph : Graph
包含可能的字符串引用的图
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
Graph
展开后的图,只包含TaskSpec对象
Note
-----
引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。
例如:["c", "tc", bump] 会展开为:
- c的所有任务(无依赖)
- tc的所有任务(依赖于c的最后一个任务)
- bump任务(依赖于tc的最后一个任务)
"""
# 检查是否有待解析的引用
pending_refs = getattr(graph, "_pending_refs", None)
if not pending_refs:
return graph
# 收集所有TaskSpec(按正确顺序:先引用,后原始TaskSpec)
all_specs: list[TaskSpec[Any]] = []
# 记录每个引用展开后的所有任务名,用于建立依赖链
previous_ref_last_task: str | None = None
# 先解析每个引用,并建立依赖关系
for ref in pending_refs:
expanded_specs = self._parse_ref(ref, current_cmd)
# 如果有前面的引用,让当前引用的所有任务依赖于前面引用的最后一个任务
if previous_ref_last_task and expanded_specs:
# 为当前引用的每个任务添加依赖
for i, task in enumerate(expanded_specs):
# 只为没有依赖的任务添加依赖,或者为第一个任务添加依赖
if i == 0 or not task.depends_on:
updated_task = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task}))
expanded_specs[i] = updated_task
# 记录当前引用的最后一个任务名
if expanded_specs:
previous_ref_last_task = expanded_specs[-1].name
all_specs.extend(expanded_specs)
# 然后添加原始图中的TaskSpec,并让它们按顺序执行
original_specs = list(graph.all_specs().values())
if original_specs:
# 第一个原始TaskSpec依赖于最后一个引用的任务
if previous_ref_last_task:
first_original = original_specs[0]
updated_first = replace(
first_original, depends_on=tuple({*first_original.depends_on, previous_ref_last_task})
)
all_specs.append(updated_first)
else:
# 如果没有引用,直接添加第一个原始TaskSpec
all_specs.append(original_specs[0])
# 后续的原始TaskSpec依赖于前一个原始TaskSpec
for i in range(1, len(original_specs)):
current_task = original_specs[i]
previous_task_name = original_specs[i - 1].name
# 更新依赖,确保顺序执行
updated_task = replace(current_task, depends_on=tuple({*current_task.depends_on, previous_task_name}))
all_specs.append(updated_task)
# 创建新的图(不包含引用)
return Graph.from_specs(all_specs)
def _parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]:
"""解析单个字符串引用.
Parameters
----------
ref : str
引用字符串(如"tc""tc.lint"
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
list[TaskSpec[Any]]
解析后的TaskSpec列表
Raises
------
ValueError
如果引用无效或存在循环引用
"""
# 避免循环引用
if ref == current_cmd:
raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己")
# 解析引用格式
if "." in ref:
# 特定任务引用: "command_name.task_name"
cmd_name, task_name = ref.split(".", 1)
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取特定任务
ref_graph = self.graphs[cmd_name]
if task_name not in ref_graph.all_specs():
raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}'")
return [ref_graph.all_specs()[task_name]]
else:
# 整个命令图引用: "command_name"
cmd_name = ref
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取整个图的所有任务
ref_graph = self.graphs[cmd_name]
# 递归展开引用(如果引用的图也有引用)
ref_graph = self._expand_refs(ref_graph, cmd_name)
return list(ref_graph.all_specs().values())
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# 内省 # 内省
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
+2 -4
View File
@@ -90,16 +90,14 @@ class TestInstallEmbedPython:
output_dir = tmp_path / "python" output_dir = tmp_path / "python"
# Create a mock cache file that doesn't exist (force download) # Create a mock cache file that doesn't exist (force download)
with patch("urllib.request.urlretrieve") as mock_urlretrieve, \ with patch("urllib.request.urlretrieve") as mock_urlretrieve, patch("zipfile.ZipFile") as mock_zipfile:
patch("zipfile.ZipFile") as mock_zipfile:
# Mock successful download # Mock successful download
mock_urlretrieve.return_value = None mock_urlretrieve.return_value = None
mock_zip_instance = MagicMock() mock_zip_instance = MagicMock()
mock_zipfile.return_value.__enter__.return_value = mock_zip_instance mock_zipfile.return_value.__enter__.return_value = mock_zip_instance
# Ensure cache doesn't exist by using tmp_path as cache dir # Ensure cache doesn't exist by using tmp_path as cache dir
with patch.object(packtool, 'DEFAULT_CACHE_DIR', str(tmp_path / ".cache")): with patch.object(packtool, "DEFAULT_CACHE_DIR", str(tmp_path / ".cache")):
packtool.install_embed_python("3.10", output_dir) packtool.install_embed_python("3.10", output_dir)
# Verify download was called # Verify download was called
+499
View File
@@ -0,0 +1,499 @@
"""Tests for command reference feature in CliRunner."""
from __future__ import annotations
import pytest
import pyflowx as px
class TestCommandReferences:
"""Test string references in Graph.from_specs."""
def test_simple_command_reference(self) -> None:
"""Should expand simple command reference."""
build_task = px.TaskSpec("build", cmd=["echo", "building"])
test_task = px.TaskSpec("test", cmd=["echo", "testing"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"build": px.Graph.from_specs([build_task]),
"test": px.Graph.from_specs([test_task]),
"all": px.Graph.from_specs([build_task, "test"]),
},
)
# Check that 'all' command has both tasks
all_tasks = list(runner.graphs["all"].all_specs().keys())
assert "build" in all_tasks
assert "test" in all_tasks
assert len(all_tasks) == 2
def test_multiple_command_references(self) -> None:
"""Should expand multiple command references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]),
"cmd3": px.Graph.from_specs([task3]),
"all": px.Graph.from_specs(["cmd1", "cmd2", "cmd3"]),
},
)
# Check that 'all' command has all tasks
all_tasks = list(runner.graphs["all"].all_specs().keys())
assert set(all_tasks) == {"task1", "task2", "task3"}
def test_specific_task_reference(self) -> None:
"""Should expand specific task reference."""
lint_task = px.TaskSpec("lint", cmd=["echo", "linting"])
format_task = px.TaskSpec("format", cmd=["echo", "formatting"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"lint": px.Graph.from_specs([lint_task, format_task]),
"quick": px.Graph.from_specs(["lint.lint"]),
},
)
# Check that 'quick' command only has lint task
quick_tasks = list(runner.graphs["quick"].all_specs().keys())
assert quick_tasks == ["lint"]
def test_nested_command_reference(self) -> None:
"""Should expand nested command references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1", task2]),
"cmd3": px.Graph.from_specs(["cmd2", task3]),
},
)
# Check that 'cmd3' has all tasks
cmd3_tasks = list(runner.graphs["cmd3"].all_specs().keys())
assert set(cmd3_tasks) == {"task1", "task2", "task3"}
def test_circular_reference_error(self) -> None:
"""Should raise error for circular references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="循环引用"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs(["cmd1", task1]),
},
)
def test_invalid_command_reference_error(self) -> None:
"""Should raise error for invalid command reference."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="引用的命令 'invalid' 不存在"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs(["invalid", task1]),
},
)
def test_invalid_task_reference_error(self) -> None:
"""Should raise error for invalid task reference."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="任务 'invalid' 不存在于命令 'cmd1'"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1.invalid"]),
},
)
def test_reference_preserves_dependencies(self) -> None:
"""Should preserve dependencies when expanding references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"], depends_on=("task1",))
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1"]),
},
)
# Check that dependencies are preserved
cmd2_deps = runner.graphs["cmd2"].deps
assert cmd2_deps["task2"] == ("task1",)
def test_mixed_references_and_tasks(self) -> None:
"""Should handle mixed references and direct tasks."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1", task3]),
},
)
# Check that 'cmd2' has all tasks
cmd2_tasks = list(runner.graphs["cmd2"].all_specs().keys())
assert set(cmd2_tasks) == {"task1", "task2", "task3"}
def test_execution_order_with_references(self) -> None:
"""Should execute references in correct order."""
task1 = px.TaskSpec("task1", cmd=["echo", "step1"])
task2 = px.TaskSpec("task2", cmd=["echo", "step2"])
task3 = px.TaskSpec("task3", cmd=["echo", "step3"])
task4 = px.TaskSpec("task4", cmd=["echo", "step4"])
task5 = px.TaskSpec("task5", cmd=["echo", "step5"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2, task3]),
"cmd3": px.Graph.from_specs([task4]),
"ordered": px.Graph.from_specs(["cmd1", "cmd2", "cmd3", task5]),
},
)
# Check execution order through layers
layers = runner.graphs["ordered"].layers()
# Layer 1 should have task1 (cmd1)
assert "task1" in layers[0]
# Layer 2 should have task2 and task3 (cmd2)
assert "task2" in layers[1]
assert "task3" in layers[1]
# Layer 3 should have task4 (cmd3)
assert "task4" in layers[2]
# Layer 4 should have task5 (original task)
assert "task5" in layers[3]
# Verify total layers
assert len(layers) == 4
def test_execution_order_multiple_original_tasks(self) -> None:
"""Should execute multiple original TaskSpecs in correct order."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
task4 = px.TaskSpec("task4", cmd=["echo", "4"])
task5 = px.TaskSpec("task5", cmd=["echo", "5"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]),
"all": px.Graph.from_specs(["cmd1", "cmd2", task3, task4, task5]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# Layer 1: task1 (cmd1)
assert "task1" in layers[0]
# Layer 2: task2 (cmd2)
assert "task2" in layers[1]
# Layer 3: task3 (first original TaskSpec)
assert "task3" in layers[2]
# Layer 4: task4 (second original TaskSpec)
assert "task4" in layers[3]
# Layer 5: task5 (third original TaskSpec)
assert "task5" in layers[4]
# Verify total layers
assert len(layers) == 5
def test_execution_order_with_internal_dependencies(self) -> None:
"""Should preserve internal dependencies within referenced commands."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"], depends_on=("task1",))
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
task4 = px.TaskSpec("task4", cmd=["echo", "4"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs([task3]),
"all": px.Graph.from_specs(["cmd1", "cmd2", task4]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# Layer 1: task1
assert "task1" in layers[0]
# Layer 2: task2 (depends on task1)
assert "task2" in layers[1]
# Layer 3: task3 (cmd2, depends on task2)
assert "task3" in layers[2]
# Layer 4: task4 (original TaskSpec, depends on task3)
assert "task4" in layers[3]
# Verify total layers
assert len(layers) == 4
def test_execution_order_pymake_bump_scenario(self) -> None:
"""Should execute pymake bump command in correct order."""
# Simulate pymake bump scenario
git_clean = px.TaskSpec("git_clean", cmd=["echo", "clean"])
typecheck = px.TaskSpec("typecheck", cmd=["echo", "typecheck"])
lint = px.TaskSpec("lint", cmd=["echo", "lint"])
format_task = px.TaskSpec("format", cmd=["echo", "format"], depends_on=("lint",))
git_add_all = px.TaskSpec("git_add_all", cmd=["echo", "git add -A"])
bump = px.TaskSpec("bumpversion", cmd=["echo", "bumpversion -t"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"c": px.Graph.from_specs([git_clean]),
"tc": px.Graph.from_specs([typecheck, "lint"]),
"lint": px.Graph.from_specs([lint, format_task]),
"bump": px.Graph.from_specs(["c", "tc", git_add_all, bump]),
},
)
# Check execution order through layers
layers = runner.graphs["bump"].layers()
# Layer 1: git_clean (c)
assert "git_clean" in layers[0]
# Layer 2: lint (tc.lint, depends on git_clean)
assert "lint" in layers[1]
# Layer 3: format (tc.lint.format, depends on lint)
assert "format" in layers[2]
# Layer 4: typecheck (tc.typecheck, depends on format)
assert "typecheck" in layers[3]
# Layer 5: git_add_all (original TaskSpec, depends on typecheck)
assert "git_add_all" in layers[4]
# Layer 6: bumpversion (original TaskSpec, depends on git_add_all)
assert "bumpversion" in layers[5]
# Verify total layers
assert len(layers) == 6
def test_execution_order_only_references(self) -> None:
"""Should execute only references without original TaskSpecs."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]),
"cmd3": px.Graph.from_specs([task3]),
"all": px.Graph.from_specs(["cmd1", "cmd2", "cmd3"]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# Layer 1: task1 (cmd1)
assert "task1" in layers[0]
# Layer 2: task2 (cmd2, depends on task1)
assert "task2" in layers[1]
# Layer 3: task3 (cmd3, depends on task2)
assert "task3" in layers[2]
# Verify total layers
assert len(layers) == 3
def test_execution_order_only_original_tasks(self) -> None:
"""Should execute only original TaskSpecs without references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"all": px.Graph.from_specs([task1, task2, task3]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# All tasks should be in layer 1 (no dependencies)
assert "task1" in layers[0]
assert "task2" in layers[0]
assert "task3" in layers[0]
# Verify total layers
assert len(layers) == 1
def test_execution_order_single_reference(self) -> None:
"""Should execute single reference correctly."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"all": px.Graph.from_specs(["cmd1"]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# Should have the same structure as cmd1
assert "task1" in layers[0]
assert "task2" in layers[0]
# Verify total layers
assert len(layers) == 1
def test_execution_order_deep_nesting(self) -> None:
"""Should execute deeply nested references correctly."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
task4 = px.TaskSpec("task4", cmd=["echo", "4"])
task5 = px.TaskSpec("task5", cmd=["echo", "5"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1", task2]),
"cmd3": px.Graph.from_specs(["cmd2", task3]),
"cmd4": px.Graph.from_specs(["cmd3", task4]),
"cmd5": px.Graph.from_specs(["cmd4", task5]),
},
)
# Check execution order through layers
layers = runner.graphs["cmd5"].layers()
# Should execute in order: task1 -> task2 -> task3 -> task4 -> task5
assert "task1" in layers[0]
assert "task2" in layers[1]
assert "task3" in layers[2]
assert "task4" in layers[3]
assert "task5" in layers[4]
# Verify total layers
assert len(layers) == 5
def test_execution_order_with_parallel_tasks_in_reference(self) -> None:
"""Should handle parallel tasks within referenced commands."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
task4 = px.TaskSpec("task4", cmd=["echo", "4"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]), # Parallel tasks
"cmd2": px.Graph.from_specs([task3, task4]), # Parallel tasks
"all": px.Graph.from_specs(["cmd1", "cmd2"]),
},
)
# Check execution order through layers
layers = runner.graphs["all"].layers()
# Layer 1: task1 and task2 (cmd1, parallel)
assert "task1" in layers[0]
assert "task2" in layers[0]
# Layer 2: task3 and task4 (cmd2, depends on cmd1's last task)
# Note: Both task3 and task4 should depend on the last task of cmd1
assert "task3" in layers[1]
assert "task4" in layers[1]
# Verify total layers
assert len(layers) == 2
def test_execution_order_complex_mixed_scenario(self) -> None:
"""Should handle complex mixed scenario with references and TaskSpecs."""
# Create a complex scenario
clean = px.TaskSpec("clean", cmd=["echo", "clean"])
build1 = px.TaskSpec("build1", cmd=["echo", "build1"])
build2 = px.TaskSpec("build2", cmd=["echo", "build2"], depends_on=("build1",))
test1 = px.TaskSpec("test1", cmd=["echo", "test1"])
test2 = px.TaskSpec("test2", cmd=["echo", "test2"])
package = px.TaskSpec("package", cmd=["echo", "package"])
deploy = px.TaskSpec("deploy", cmd=["echo", "deploy"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"clean": px.Graph.from_specs([clean]),
"build": px.Graph.from_specs([build1, build2]),
"test": px.Graph.from_specs([test1, test2]),
"release": px.Graph.from_specs(["clean", "build", "test", package, deploy]),
},
)
# Check execution order through layers
layers = runner.graphs["release"].layers()
# Layer 1: clean
assert "clean" in layers[0]
# Layer 2: build1 (depends on clean)
assert "build1" in layers[1]
# Layer 3: build2 (depends on build1)
assert "build2" in layers[2]
# Layer 4: test1 and test2 (depends on build2)
assert "test1" in layers[3]
assert "test2" in layers[3]
# Layer 5: package (depends on test1/test2)
assert "package" in layers[4]
# Layer 6: deploy (depends on package)
assert "deploy" in layers[5]
# Verify total layers
assert len(layers) == 6