1 Commits

Author SHA1 Message Date
zhou 9e99a1f1ba ~
Release / Pre-release Check (push) Failing after 31s
Release / Build Artifacts (push) Has been skipped
Release / Publish to PyPI (push) Has been skipped
Release / Publish Release (push) Has been skipped
2026-06-25 12:35:27 +08:00
7 changed files with 402 additions and 23 deletions
+1 -1
View File
@@ -17,7 +17,7 @@ license = { text = "MIT" }
name = "pyflowx"
readme = "README.md"
requires-python = ">=3.8"
version = "0.1.8"
version = "0.1.10"
[project.scripts]
autofmt = "pyflowx.cli.autofmt:main"
+1 -1
View File
@@ -84,7 +84,7 @@ from .runner import CliExitCode, CliRunner
from .storage import JSONBackend, MemoryBackend, StateBackend
from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus
__version__ = "0.1.8"
__version__ = "0.1.10"
__all__ = [
"IS_LINUX",
+8 -6
View File
@@ -20,13 +20,15 @@ def maturin_build_cmd() -> list[str]:
"""
command = ["maturin", "build", "-r"].copy()
if Constants.IS_WINDOWS:
command.extend([
command.extend(
[
"--target",
"x86_64-win7-windows-msvc",
"-Zbuild-std",
"-i",
"python3.8",
])
]
)
return command
@@ -99,26 +101,26 @@ def main():
pymake type # 类型检查
"""
runner = px.CliRunner(
strategy="thread",
strategy="sequential",
description="PyMake - Python 构建工具",
graphs={
# 构建命令
"b": px.Graph.from_specs([uv_build]),
"bc": px.Graph.from_specs([maturin_build]),
"ba": px.Graph.from_specs([uv_build, maturin_build]),
"ba": px.Graph.from_specs(["b", "bc"]),
# 安装命令
"sync": px.Graph.from_specs([uv_sync]),
# 清理命令
"c": px.Graph.from_specs([git_clean]),
# 开发工具
"bump": px.Graph.from_specs([git_clean, typecheck, ruff_lint, ruff_format, bump]),
"bump": px.Graph.from_specs(["c", "tc", bump]),
"cov": px.Graph.from_specs([git_clean, test_coverage]),
"doc": px.Graph.from_specs([doc]),
"lint": px.Graph.from_specs([ruff_lint, ruff_format]),
"pb": px.Graph.from_specs([twine_publish, hatch_publish]),
"t": px.Graph.from_specs([test]),
"tf": px.Graph.from_specs([test_fast]),
"tc": px.Graph.from_specs([typecheck, ruff_lint, ruff_format]),
"tc": px.Graph.from_specs([typecheck, "lint"]),
"tox": px.Graph.from_specs([tox]),
# 发布命令
"p": px.Graph.from_specs([git_clean, git_push, git_push_tags]),
+43 -2
View File
@@ -57,18 +57,59 @@ class Graph:
return self
@classmethod
def from_specs(cls, specs: Iterable[TaskSpec[Any]]) -> Graph:
"""从可迭代的 task spec 构建图
def from_specs(cls, specs: Iterable[TaskSpec[Any] | str]) -> Graph:
"""从可迭代的 task spec 构建图.
先收集所有 spec,再统一校验。这意味着任务可以引用*后出现*的
依赖——顺序无关,就像声明式配置文件的读取方式。
支持字符串引用,允许引用其他命令图中的任务。
字符串引用将在CliRunner中解析展开。
Parameters
----------
specs : Iterable[TaskSpec[Any] | str]
TaskSpec对象或字符串引用的列表
Returns
-------
Graph
构建完成的图
Note
-----
字符串引用格式:
- "command_name" - 引用整个命令图
- "command_name.task_name" - 引用特定任务
Examples
--------
>>> graph = Graph.from_specs([
... TaskSpec("build", cmd=["uv", "build"]),
... "test", # 引用test命令图
... ])
"""
graph = cls()
pending_refs: list[str] = []
for spec in specs:
if isinstance(spec, str):
# 字符串引用,稍后解析
pending_refs.append(spec)
elif isinstance(spec, TaskSpec):
if spec.name in graph.specs:
raise DuplicateTaskError(spec.name)
graph.specs[spec.name] = spec
graph.deps[spec.name] = spec.depends_on
else:
raise TypeError(f"from_specs只接受TaskSpec或str,收到: {type(spec)}")
# 存储待解析的引用
if pending_refs:
# 使用特殊属性存储引用,稍后在CliRunner中解析
# 由于Graph是frozen dataclass,我们需要特殊处理
object.__setattr__(graph, "_pending_refs", pending_refs)
graph._validate_references()
graph.validate()
return graph
+142
View File
@@ -114,6 +114,148 @@ class CliRunner:
if not self.graphs:
raise ValueError("CliRunner 至少需要一个命令 (通过关键字参数提供)")
# 解析并展开字符串引用
self._resolve_graph_refs()
def _resolve_graph_refs(self) -> None:
"""解析并展开图中的字符串引用.
支持两种引用格式:
1. "command_name" - 引用整个命令图
2. "command_name.task_name" - 引用特定任务
递归解析所有引用,直到所有图都只包含TaskSpec对象。
"""
resolved_graphs: dict[str, Graph] = {}
for cmd_name, graph in self.graphs.items():
resolved_graph = self._expand_refs(graph, cmd_name)
resolved_graphs[cmd_name] = resolved_graph
# 更新graphs字典
object.__setattr__(self, "graphs", resolved_graphs)
def _expand_refs(self, graph: Graph, current_cmd: str) -> Graph:
"""展开图中的字符串引用.
Parameters
----------
graph : Graph
包含可能的字符串引用的图
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
Graph
展开后的图,只包含TaskSpec对象
Note
-----
引用按顺序展开,后续引用的任务依赖于前面引用的任务完成。
例如:["c", "tc", bump] 会展开为:
- c的所有任务(无依赖)
- tc的所有任务(依赖于c的最后一个任务)
- bump任务(依赖于tc的最后一个任务)
"""
# 检查是否有待解析的引用
pending_refs = getattr(graph, "_pending_refs", None)
if not pending_refs:
return graph
# 收集所有TaskSpec(包括原始图中的)
all_specs: list[TaskSpec[Any]] = []
for spec in graph.all_specs().values():
all_specs.append(spec)
# 记录每个引用展开后的所有任务名,用于建立依赖链
previous_ref_last_task: str | None = None
# 解析每个引用,并建立依赖关系
for ref in pending_refs:
expanded_specs = self._parse_ref(ref, current_cmd)
# 如果有前面的引用,让当前引用的所有任务依赖于前面引用的最后一个任务
if previous_ref_last_task and expanded_specs:
# 为当前引用的每个任务添加依赖
for i, task in enumerate(expanded_specs):
# 只为没有依赖的任务添加依赖,或者为第一个任务添加依赖
if i == 0 or not task.depends_on:
updated_task = replace(task, depends_on=tuple({*task.depends_on, previous_ref_last_task}))
expanded_specs[i] = updated_task
# 记录当前引用的最后一个任务名
if expanded_specs:
previous_ref_last_task = expanded_specs[-1].name
all_specs.extend(expanded_specs)
# 如果原始图中有TaskSpec,让它们依赖于最后一个引用的任务
original_specs = list(graph.all_specs().values())
if previous_ref_last_task and original_specs:
# 为每个原始TaskSpec添加依赖
for i, original_task in enumerate(original_specs):
# 只为第一个原始任务添加依赖,或者为没有依赖的任务添加依赖
if i == 0 or not original_task.depends_on:
updated_task = replace(
original_task, depends_on=tuple({*original_task.depends_on, previous_ref_last_task})
)
all_specs[all_specs.index(original_task)] = updated_task
# 创建新的图(不包含引用)
return Graph.from_specs(all_specs)
def _parse_ref(self, ref: str, current_cmd: str) -> list[TaskSpec[Any]]:
"""解析单个字符串引用.
Parameters
----------
ref : str
引用字符串(如"tc""tc.lint"
current_cmd : str
当前命令名(用于避免循环引用)
Returns
-------
list[TaskSpec[Any]]
解析后的TaskSpec列表
Raises
------
ValueError
如果引用无效或存在循环引用
"""
# 避免循环引用
if ref == current_cmd:
raise ValueError(f"循环引用: 命令 '{current_cmd}' 引用了自己")
# 解析引用格式
if "." in ref:
# 特定任务引用: "command_name.task_name"
cmd_name, task_name = ref.split(".", 1)
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取特定任务
ref_graph = self.graphs[cmd_name]
if task_name not in ref_graph.all_specs():
raise ValueError(f"任务 '{task_name}' 不存在于命令 '{cmd_name}'")
return [ref_graph.all_specs()[task_name]]
else:
# 整个命令图引用: "command_name"
cmd_name = ref
if cmd_name not in self.graphs:
raise ValueError(f"引用的命令 '{cmd_name}' 不存在")
# 获取整个图的所有任务
ref_graph = self.graphs[cmd_name]
# 递归展开引用(如果引用的图也有引用)
ref_graph = self._expand_refs(ref_graph, cmd_name)
return list(ref_graph.all_specs().values())
# ------------------------------------------------------------------ #
# 内省
# ------------------------------------------------------------------ #
+2 -4
View File
@@ -90,16 +90,14 @@ class TestInstallEmbedPython:
output_dir = tmp_path / "python"
# Create a mock cache file that doesn't exist (force download)
with patch("urllib.request.urlretrieve") as mock_urlretrieve, \
patch("zipfile.ZipFile") as mock_zipfile:
with patch("urllib.request.urlretrieve") as mock_urlretrieve, patch("zipfile.ZipFile") as mock_zipfile:
# Mock successful download
mock_urlretrieve.return_value = None
mock_zip_instance = MagicMock()
mock_zipfile.return_value.__enter__.return_value = mock_zip_instance
# Ensure cache doesn't exist by using tmp_path as cache dir
with patch.object(packtool, 'DEFAULT_CACHE_DIR', str(tmp_path / ".cache")):
with patch.object(packtool, "DEFAULT_CACHE_DIR", str(tmp_path / ".cache")):
packtool.install_embed_python("3.10", output_dir)
# Verify download was called
+196
View File
@@ -0,0 +1,196 @@
"""Tests for command reference feature in CliRunner."""
from __future__ import annotations
import pytest
import pyflowx as px
class TestCommandReferences:
"""Test string references in Graph.from_specs."""
def test_simple_command_reference(self) -> None:
"""Should expand simple command reference."""
build_task = px.TaskSpec("build", cmd=["echo", "building"])
test_task = px.TaskSpec("test", cmd=["echo", "testing"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"build": px.Graph.from_specs([build_task]),
"test": px.Graph.from_specs([test_task]),
"all": px.Graph.from_specs([build_task, "test"]),
},
)
# Check that 'all' command has both tasks
all_tasks = list(runner.graphs["all"].all_specs().keys())
assert "build" in all_tasks
assert "test" in all_tasks
assert len(all_tasks) == 2
def test_multiple_command_references(self) -> None:
"""Should expand multiple command references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2]),
"cmd3": px.Graph.from_specs([task3]),
"all": px.Graph.from_specs(["cmd1", "cmd2", "cmd3"]),
},
)
# Check that 'all' command has all tasks
all_tasks = list(runner.graphs["all"].all_specs().keys())
assert set(all_tasks) == {"task1", "task2", "task3"}
def test_specific_task_reference(self) -> None:
"""Should expand specific task reference."""
lint_task = px.TaskSpec("lint", cmd=["echo", "linting"])
format_task = px.TaskSpec("format", cmd=["echo", "formatting"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"lint": px.Graph.from_specs([lint_task, format_task]),
"quick": px.Graph.from_specs(["lint.lint"]),
},
)
# Check that 'quick' command only has lint task
quick_tasks = list(runner.graphs["quick"].all_specs().keys())
assert quick_tasks == ["lint"]
def test_nested_command_reference(self) -> None:
"""Should expand nested command references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1", task2]),
"cmd3": px.Graph.from_specs(["cmd2", task3]),
},
)
# Check that 'cmd3' has all tasks
cmd3_tasks = list(runner.graphs["cmd3"].all_specs().keys())
assert set(cmd3_tasks) == {"task1", "task2", "task3"}
def test_circular_reference_error(self) -> None:
"""Should raise error for circular references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="循环引用"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs(["cmd1", task1]),
},
)
def test_invalid_command_reference_error(self) -> None:
"""Should raise error for invalid command reference."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="引用的命令 'invalid' 不存在"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs(["invalid", task1]),
},
)
def test_invalid_task_reference_error(self) -> None:
"""Should raise error for invalid task reference."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
with pytest.raises(ValueError, match="任务 'invalid' 不存在于命令 'cmd1'"):
px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs(["cmd1.invalid"]),
},
)
def test_reference_preserves_dependencies(self) -> None:
"""Should preserve dependencies when expanding references."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"], depends_on=("task1",))
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1"]),
},
)
# Check that dependencies are preserved
cmd2_deps = runner.graphs["cmd2"].deps
assert cmd2_deps["task2"] == ("task1",)
def test_mixed_references_and_tasks(self) -> None:
"""Should handle mixed references and direct tasks."""
task1 = px.TaskSpec("task1", cmd=["echo", "1"])
task2 = px.TaskSpec("task2", cmd=["echo", "2"])
task3 = px.TaskSpec("task3", cmd=["echo", "3"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1, task2]),
"cmd2": px.Graph.from_specs(["cmd1", task3]),
},
)
# Check that 'cmd2' has all tasks
cmd2_tasks = list(runner.graphs["cmd2"].all_specs().keys())
assert set(cmd2_tasks) == {"task1", "task2", "task3"}
def test_execution_order_with_references(self) -> None:
"""Should execute references in correct order."""
task1 = px.TaskSpec("task1", cmd=["echo", "step1"])
task2 = px.TaskSpec("task2", cmd=["echo", "step2"])
task3 = px.TaskSpec("task3", cmd=["echo", "step3"])
task4 = px.TaskSpec("task4", cmd=["echo", "step4"])
task5 = px.TaskSpec("task5", cmd=["echo", "step5"])
runner = px.CliRunner(
strategy="sequential",
graphs={
"cmd1": px.Graph.from_specs([task1]),
"cmd2": px.Graph.from_specs([task2, task3]),
"cmd3": px.Graph.from_specs([task4]),
"ordered": px.Graph.from_specs(["cmd1", "cmd2", "cmd3", task5]),
},
)
# Check execution order through layers
layers = runner.graphs["ordered"].layers()
# Layer 1 should have task1 (cmd1)
assert "task1" in layers[0]
# Layer 2 should have task2 and task3 (cmd2)
assert "task2" in layers[1]
assert "task3" in layers[1]
# Layer 3 should have task4 (cmd3)
assert "task4" in layers[2]
# Layer 4 should have task5 (original task)
assert "task5" in layers[3]
# Verify total layers
assert len(layers) == 4