From d0ff7d7b4dc788b57bd521e3250c4a15c74e9f59 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 28 Jun 2026 09:34:45 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=9B=B4=E6=96=B0=20README=20=E4=B8=8E?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20Python=20=E5=BC=80=E5=8F=91=E8=A7=84?= =?UTF-8?q?=E8=8C=83=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本次提交大幅完善了 PyFlowX 的 README 文档,新增了四种执行策略、软依赖、并发限制、任务钩子等多项特性说明,补充了任务模板、图组合、缓存键等新功能的使用示例,同时更新了执行参数、执行策略对照表与模块结构文档。另外新增了 .trae/rules/python-standards.md 规范文档,统一了项目的代码风格、类型检查、测试编写等开发标准。 --- .trae/rules/python-standards.md | 362 ++++++++++++++++++++++++++++++++ README.md | 163 ++++++++++++-- 2 files changed, 505 insertions(+), 20 deletions(-) create mode 100644 .trae/rules/python-standards.md diff --git a/.trae/rules/python-standards.md b/.trae/rules/python-standards.md new file mode 100644 index 0000000..28df532 --- /dev/null +++ b/.trae/rules/python-standards.md @@ -0,0 +1,362 @@ +# Python 开发规范 + +本规范结合 Python 最佳实践,作为编写与审查 Python 代码的统一标准。 + +## 工具链(以 pyproject.toml 为准) + +| 工具 | 用途 | 配置要点 | +|------|------|---------| +| **ruff** | lint + format | `line-length=120`,`target-version="py38"`,select 见 pyproject | +| **pyrefly** | 类型检查 | `preset="strict"`,`python-version="3.8"` | +| **pytest** | 测试 | `asyncio_default_fixture_loop_scope="function"`,marker `slow` | +| **coverage** | 覆盖率 | `branch=true`,`fail_under=95`,`concurrency=["thread"]` | +| **pre-commit** | 提交前检查 | ruff `--fix` + trailing-whitespace + end-of-file-fixer | + +验证(每次修改后必做): + +```bash +uvx --from pyflowx pymake tc +uvx --from pyflowx pymake cov +``` + +## 兼容性 + +- **最低 Python 3.8**:使用 `from __future__ import annotations` 延迟注解求值; + 3.9 以下用 `typing.List`/`typing.Dict`/`typing.Union`,3.9+ 用内置泛型, + 3.10+ 用 `X | Y`,3.12+ 用 `typing.override`。 +- **版本守卫**:`if sys.version_info >= (3, X):` 引入高版本 API,else 回退。 + 低版本回退分支加 `# pragma: no cover`。 +- **零运行时依赖**:仅依赖标准库(3.8 需 `graphlib_backport`、`typing-extensions`)。 + 新增依赖须审慎,优先用标准库。 + +## 类型注解 + +- **所有公共 API 必须有完整类型注解**,包括返回类型。 +- **私有函数也应有注解**,便于 pyrefly strict 通过。 +- 泛型用 `TypeVar`;使用 PEP 696 `default=` 参数时,3.13+ 用 `typing.TypeVar`, + 3.8–3.12 用 `typing_extensions.TypeVar`(`default=` 在 3.13 才进入标准库)。 +- `Mapping`/`Sequence` 用于只读参数,`dict`/`list` 用于可变返回。 +- `Any` 仅用于真正动态的场景(如 `Context` 跨任务异构映射);单个任务内部类型 + 必须完全静态,由函数签名保证。 +- 避免渐进式类型:不要用 `# type: ignore` 掩盖真实类型错误;确需时加 + 具体规则码注释(如 `# type: ignore[union-attr]`)。 +- **`TYPE_CHECKING` 守卫**:仅类型检查需要的导入(如为避免循环依赖、 + 或仅注解用的重类型)放 `if TYPE_CHECKING:` 块内,运行期不导入: + ```python + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from pyflowx.graph import Graph # 仅注解使用 + ``` +- **类型收窄**:分支内需要窄化类型时,用 `assert isinstance(x, Y)` 辅助 + pyrefly 推断;避免无谓的 `cast()`(运行期无检查,掩盖 bug)。 + `cast` 仅用于类型系统无法表达的场景(如第三方无类型存根)。 + +## 数据结构 + +- **不可变优先**:配置/描述类用 `@dataclass(frozen=True)`。 +- **可变类属性**显式标注 `RUF012` 豁免(如配置默认值 `field(default_factory=...)`)。 +- **缓存计算结果**用 `functools.cached_property`(实例级)或 + `functools.lru_cache`(按参数键控);不可哈希参数需 try/except 回退。 +- **缓存失效**:修改被缓存的数据源后必须手动清空缓存(如 `self._cache.clear()`), + 否则下游拿到陈旧数据。 +- **抽象基类**:定义接口用 `abc.ABC` + `@abstractmethod`(如 `StateBackend`), + 强制子类实现关键方法;非抽象方法提供默认实现。 +- **枚举**:状态/标志值用 `enum.Enum`(或 `IntEnum`/`Flag`,如 `TaskStatus`), + 禁止裸字符串/魔术数字;枚举值用 `UPPER_SNAKE`。 +- **`__repr__`**:可变类实现 `__repr__`(含关键字段),便于调试与日志; + `frozen=True` dataclass 自动生成,无需手写。 + +## 模块与导入 + +- **单一职责**:每个模块只做一件事(`task.py` 纯数据结构、`executors.py` 执行逻辑、 + `command.py` 命令执行、`compose.py` 图组合)。禁止跨职责边界放代码。 +- **导入顺序**(ruff isort):`__future__` → 标准库 → 第三方 → 本地,各组间空行。 +- **惰性导入**:仅为打破循环依赖时使用,在函数体内导入,加注释说明原因。 + 顶层导入是默认。 +- **`__all__`**:定义 `__all__`,显式声明导出符号。位置在模块顶部,仅次于 `__future__` 之后。 +- **禁用 star imports**:`from x import *` 污染命名空间、隐藏依赖、 + 破坏类型检查;显式列出导入符号,或用模块路径访问(`import x` → `x.foo`)。 + 例外:`__init__.py` 聚合导出时可经 `__all__` 控制。 +- **避免 `utils.py`、`helpers.py` 等杂项工具文件**:按职责归入对应模块,不要建杂项工具文件。 + +## 函数设计 + +- **模块级函数优于 Mixin**:共享逻辑用模块级函数,不要新建 Mixin 类。 + 类只持有状态与薄方法,调用模块级函数。 +- **静态方法慎用**:纯函数直接放模块级,不必塞进类里当 `@staticmethod`。 +- **参数 ≤ 5 个**为宜;超出考虑用 dataclass 封装参数对象(ruff `PLR0913` 已忽略, + 但仍是设计信号)。 +- **单一职责**:一个函数做一件事;过长函数(ruff `PLR0915` 已忽略但仍是信号) + 考虑拆分。 +- **异常范围要窄**:只捕获预期异常(如 `(TypeError, ValueError, KeyError, AttributeError)`), + **不要** `except Exception` 掩盖 bug。捕获后至少 `logger.warning` 记录。 + +## 异常处理 + +- **自定义异常家族**:继承公共基类(如 `PyFlowXError`),按错误场景分类。 +- **异常包装**:低层异常用 `raise NewError(...) from exc` 保留因果链。 +- **不要吞异常**:捕获后必须处理(记录/包装/重抛),不要空 `except: pass`。 +- **钩子/回调异常**:第三方回调异常仅记录,不影响主流程(用 `_run_hooks` 模式)。 + +## 并发与线程安全 + +- **进程全局状态**(`os.environ`/`os.chdir`)在并发场景下必须用全局锁 + (`threading.RLock`)序列化"切换→执行→恢复"区间。 +- **条件评估不可有可变状态**:组合条件(NOT/AND/OR)不得修改共享 `_reason`, + 避免竞态。 +- **批量 I/O**:循环内多次写盘改为批量一次(用 `contextmanager` 包裹延迟落盘)。 +- **信号量限流**:`concurrency_key` + `Semaphore` 按组限流,避免压垮下游。 + +## 测试 + +### 总则 + +- **覆盖率 ≥ 95%**(branch coverage),不得下降。 +- **公共 API 优先测试**:测试用公共接口(`has`/`get`),不访问私有方法 + (如 `_expired`)。兼容旧测试的私有方法应删除并迁移测试。 + 例外:`_store`/`_flush` 等内部状态在无法用公共 API 触发时(如模拟过期、 + 故障注入),可临时访问私有属性,并在 docstring 注明原因。 +- **命名**:`test_<被测对象>_<场景>`,如 `test_storage_key_cache_key_exception_returns_name`。 +- **每个测试一个断言重点**;多个断言要语义相关。 +- **slow 标记**:耗时测试加 `@pytest.mark.slow`,CI 可 `-m "not slow"` 跳过。 +- **测试代码也跑 ruff**:`tests/**` 忽略 `ARG001`/`ARG002`(未用 fixture 参数)。 +- **断言风格**:用原生 `assert` + 比较运算符(`assert x == 1`), + 不用 `self.assertEqual`;pytest 会生成更清晰的 diff。 + +### Mock 工具选择(强制) + +**优先级**:`monkeypatch` > 内联 stub > `unittest.mock` > `pytest-mock`。 + +| 场景 | 工具 | 示例 | +|------|------|------| +| 替换模块属性 / 环境变量 / 工作目录 | `monkeypatch` | `monkeypatch.setattr(subprocess, "run", fake_run)` | +| `os.environ["KEY"]` 临时设置 | `monkeypatch.setenv` | `monkeypatch.setenv("LOCALAPPDATA", "C:\\...")` | +| 切换 cwd | `monkeypatch.chdir` | `monkeypatch.chdir(tmp_path)` | +| 一次性 stub 函数 | 内联 lambda / 闭包 | `ran = []; monkeypatch.setattr(subprocess, "run", lambda *c, **__: ran.append(c))` | +| 复杂 spy(记录调用次数/参数/返回序列) | `unittest.mock.MagicMock` | 仅当 lambda 不足以表达时 | +| `with patch(...)` 上下文 | **禁用**(用 monkeypatch) | monkeypatch 自动 teardown 更安全 | + +**禁止**: +- 不用 `pytest-mock` 的 `mocker` fixture(项目虽在 dev 依赖声明,但实际 + 测试代码未使用;为保持风格统一,新代码继续用 `monkeypatch`)。 +- 不用 `unittest.mock.patch` 装饰器(`@patch("x.y")`),它隐藏依赖且 + 与 pytest fixture 模式不兼容;用 `monkeypatch.setattr` 替代。 +- 不用 `mock.patch.object` 作为上下文管理器,除非被测代码本身就是 + contextmanager(此时用 `monkeypatch.setattr` 仍更简单)。 + +### monkeypatch 使用规范 + +- **类型注解**:fixture 参数标注 `monkeypatch: pytest.MonkeyPatch`。 +- **作用域**:monkeypatch 自动在测试结束时撤销,**禁止**手动 + `monkeypatch.setattr(x, "y", original)` 恢复(多余且容易遗漏)。 + 例外:在单个测试内需要中途恢复时,用 `monkeypatch.undo()` 全量撤销。 +- **替换目标**:替换"被测代码看到的对象",而非全局对象本身。 + - 错误:`monkeypatch.setattr("os.path.exists", fake)` —— 替换全局, + 影响其他模块。 + - 正确:`monkeypatch.setattr(pyflowx.command.shutil, "which", fake)` —— + 替换被测模块引用的 `shutil.which`。 +- **属性 vs 字符串路径**:优先属性访问形式 `monkeypatch.setattr(obj, "attr", val)` + 而非字符串路径 `monkeypatch.setattr("pkg.mod.obj.attr", val)`, + 前者有 IDE 跳转与重构支持。 +- **记录调用**:用闭包 `ran: list[tuple] = []` + `lambda *a, **k: ran.append((a, k))` + 替代 `MagicMock`,可读性更好且无需导入。 + +### Stub 与 Spy 模式 + +- **轻量 stub**:内联定义 `class MockResult: returncode = 0; stdout = ""`, + 替代 `MagicMock(return_value=...)`,类型明确且不引入 mock 依赖。 +- **状态收集**:闭包 + list 比 `mock.call_args_list` 更易断言: + ```python + calls: list[list[str]] = [] + + + def fake_run(cmd: list[str], **_: Any) -> MockResult: + calls.append(cmd) + return MockResult() + + + monkeypatch.setattr(subprocess, "run", fake_run) + assert calls == [["clear"]] + ``` +- **副作用序列**:需要按调用次数返回不同值时,用 `itertools.cycle` 或 + 手动计数器,而非 `side_effect=[...]`(mock 专有 API)。 +- **异常注入**:`def raise_oserror(*a, **k): raise OSError("...")`, + 用 `pytest.raises(OSError)` 验证,而非 `side_effect=OSError`。 + +### 异常断言 + +- **`pytest.raises`**:必填 `match=` 正则(除非异常消息完全不可预测), + 避免误捕获同类异常: + ```python + with pytest.raises(StorageError, match="cannot write"): + b.save("a", 1) + ``` +- **异常链**:验证 `__cause__` 时用 `exc_info.value.__cause__`, + 确认 `raise X from Y` 因果链完整。 +- **禁止** `try/except + assert False`:用 `pytest.raises` 替代。 + +### Fixture 规范 + +- **`tmp_path`**:处理临时文件,自动清理,禁止 `tempfile.mkdtemp()` 手动管理。 +- **`monkeypatch`**:环境变量、cwd、模块属性 mock(见上)。 +- **`capsys`/`capfd`**:捕获 stdout/stderr,验证日志或命令输出。 +- **autouse fixture**:仅在全局必需时用(如 `conftest.py` 的 + `packtool_tmp_workdir` 自动切到 tmp_path);否则显式声明参数。 +- **fixture 命名**:`snake_case`,描述"提供什么"而非"测试什么" + (`sample_graph` 优于 `test_data`)。 +- **fixture 作用域**:默认 `function`;`module`/`session` 仅当构造昂贵且 + 只读时,并加注释说明无副作用。 + +### asyncio 测试 + +- **fixture `loop_scope="function"`**(pyproject 已配置默认值)。 +- **async 测试**:`async def test_x():`,pytest-asyncio 自动驱动。 +- **await 检查**:测试异步函数必须 `await` 结果,禁止仅验证返回 coroutine 对象。 +- **异步 mock**:用 `AsyncMock`(3.8+ 在 `unittest.mock`)或 + `async def fake(): return value`,禁用 `MagicMock(return_value=coro)`。 + +### 参数化 + +- **`@pytest.mark.parametrize`**:用 `ids` 参数提供可读标识: + ```python + @pytest.mark.parametrize( + ("strategy", "expected_workers"), + [("sequential", 1), ("thread", 8), ("async", 1)], + ids=["seq", "thread-8", "async"], + ) + ``` +- **参数命名**:参数元组用有意义名称,而非 `("a", "b")`。 +- **组合爆炸**:参数组合 > 20 时拆分测试,避免单个测试函数臃肿。 + +### 测试组织 + +- **文件命名**:`test_<被测模块>.py`(`test_storage.py` 对应 `storage.py`)。 +- **类分组**:仅在测试逻辑强相关时用 `class TestXxx:` 分组;默认用模块级函数。 +- **docstring**:每个测试函数一句话说明"测试什么场景",复杂场景补充"为什么"。 +- **setup/teardown**:优先 fixture;`setup_method`/`teardown_method` 仅在 + 无法用 fixture 表达时(罕见)。 + +## 代码风格 + +- **行宽 120**(ruff formatter 处理,无需手动对齐)。 +- **docstring**:公共 API 必须有 docstring;模块顶部 docstring 说明职责与设计要点。 + 中文叙述 + 英文代码注释是本项目既有风格,保持一致。 +- **命名**:`snake_case` 函数/变量,`PascalCase` 类,`UPPER_SNAKE` 常量, + `_leading_underscore` 私有。 +- **字符串引号**:ruff 默认双引号。 +- **末尾换行**:文件以单 `\n` 结尾(pre-commit `end-of-file-fixer` 强制)。 +- **无尾随空格**(pre-commit `trailing-whitespace` 强制)。 +- **不用 emoji**:除非用户明确要求。 + +## Pythonic 风格 + +- **`is` 比较 `None`/`True`/`False`**:单例用 `is`,值用 `==`。 + `if x is None:` 正确;`if x == None:` 禁止(PEP 8 E711/E712)。 +- **EAFP 优于 LBYL**:先尝试再处理异常,而非先检查再执行: + ```python + # 不好(LBYL,竞态窗口) + if os.path.exists(path): + with open(path) as f: + ... + # 好(EAFP) + try: + with open(path) as f: + ... + except FileNotFoundError: + ... + ``` + 例外:检查成本低且无竞态时(如 `if not items:` 跳过空集合)可用 LBYL。 +- **truthiness**:用 `if items:` / `if not items:` 而非 + `if len(items) > 0:` / `if items != []`。空容器/空串/0/None 均为假。 +- **字符串格式化**:首选 f-string(`f"{name=}"` 调试格式 3.8+); + `format()` 用于模板/延迟绑定;`%` 仅用于 `logging` 延迟格式化(见"日志")。 +- **推导式**:list/dict/set 推导式优于 `map`+`filter`;但推导式 > 2 层时 + 拆为显式循环(可读性优先)。 +- **`enumerate` 索引**:需要索引时用 `for i, x in enumerate(seq)`, + 禁止 `for i in range(len(seq))`。 +- **`zip` 并行**:并行迭代用 `zip(a, b)`;需要索引对齐用 + `zip(a, b, strict=True)`(3.10+,长度不等抛 ValueError)。 +- **解包**:`a, b = pair` 优于 `a = pair[0]; b = pair[1]`; + 忽略值用 `_`(`a, _ = pair`)。 +- **海象运算符 `:=`**(3.8+):赋值+判断合一,避免重复求值: + ```python + if (n := len(data)) > threshold: + logger.info("data size %d exceeds %d", n, threshold) + ``` + 但不滥用:简单赋值仍用普通 `=`。 + +## 日志 + +- **用 `logging.getLogger(__name__)`**:每个模块独立 logger,禁用 `print` 调试残留 + (提交前用 `git diff` 检查)。 +- **结构化上下文**:用 `extra={...}` 传字段,而非字符串拼接; + `logger.warning("task %r failed: %s", name, exc)` 优于 f-string(延迟格式化)。 +- **日志级别**:`DEBUG` 详细诊断、`INFO` 关键流程、`WARNING` 可恢复异常、 + `ERROR` 需人工介入。捕获预期异常至少 `logger.warning` 并附 `exc_info=True` + 当需要堆栈时。 +- **禁止日志密码/密钥**:脱敏后再记录。 + +## 可变默认参数 + +- **经典坑**:`def f(x=[])` / `def f(x={})` 的默认值在函数定义时求值一次, + 后续调用共享同一对象,导致跨调用状态泄漏。 +- **哨兵模式**:用 `None` 作默认,函数体内初始化: + ```python + def f(items: list[int] | None = None) -> list[int]: + if items is None: + items = [] + ``` +- **dataclass**:用 `field(default_factory=list)`(已在"数据结构"提及, + 普通函数同样适用此原则)。 + +## 路径处理 + +- **优先 `pathlib.Path`**:用 `Path("a") / "b"` 而非 `os.path.join("a", "b")`; + ruff `PTH` 规则已启用并强制。 +- **禁止字符串拼接路径**:`"a/" + name` 跨平台不安全(分隔符、注入风险)。 +- **类型注解**:参数与返回用 `Path`,仅在边界(如 `os.environ`、CLI argv) + 接受 `str` 后立即 `Path(s)` 包装。 +- **临时文件**:测试用 `tmp_path` fixture;生产代码用 + `tempfile.NamedTemporaryFile` 或 `contextlib.ExitStack` 管理。 + +## 资源管理 + +- **`with` 语句**:文件、锁、连接、临时目录一律用 `with` 或 + `contextlib.contextmanager`,确保异常路径也释放。 +- **自定义资源**:实现 `__enter__`/`__exit__`;多个资源用 + `contextlib.ExitStack` 动态管理。 +- **显式关闭**:长生命周期对象(连接池、线程池)实现 `close()` 并在 + `__del__` 或 `atexit` 兜底,但优先 `with`。 +- **批量操作**:循环内多次 acquire/release 改为批量一次(如 + `backend.batch()` 包裹整个执行,见"并发与线程安全")。 + +## 安全 + +- **禁用 `eval`/`exec`**:处理不可信输入时绝不使用;用 `ast.literal_eval` + 解析字面量,或专用解析器。 +- **`subprocess`**:禁用 `shell=True` 除非命令完全可信;优先 `list[str]` + 形式 `cmd`,避免 shell 注入。 +- **凭证不入仓**:密钥、token、密码放 `.env` 或环境变量,`.gitignore` + 必须包含 `.env`;代码中用 `os.environ.get("KEY")` 读取。 +- **日志脱敏**:记录请求/响应时移除 `Authorization`、`password` 等字段。 +- **依赖审计**:`uv lock` 后审阅新增依赖;避免引入已知 CVE 的包。 + +## 性能要点 + +- **避免重复计算**:循环内的 `resolved_spec`/`inspect.signature` 等查询应缓存 + 或预构建映射(如 `{name: spec}`)。 +- **避免双重查找**:`has(k)` + `get(k)` 改为单次 `get(k)` + `KeyError` 回退。 +- **统一校验**:入口校验一次,下游路径不重复校验(如 `run()` 统一 `validate()`, + `layers()` 不再重复)。 +- **事件 emit**:任务生命周期必须 emit `RUNNING` → `SUCCESS`/`FAILED`/`SKIPPED`, + 不要留死分支(`# pragma: no cover` 是清理信号,应激活或删除)。 + +## Git 与提交 + +- **不自动提交**:除非用户明确要求。 +- **不自动 push**:除非用户明确要求。 +- **不修改 git config**。 +- **不运行破坏性命令**(`push --force`/`reset --hard`/`clean -f`)除非用户明确要求。 +- **staging**:按文件名添加,不用 `git add -A`/`git add .`,避免误加敏感文件。 +- **commit message**:简洁,聚焦"为什么"而非"是什么";遵循仓库既有风格。 diff --git a/README.md b/README.md index d3de395..cceffe4 100644 --- a/README.md +++ b/README.md @@ -14,18 +14,25 @@ PyFlowX 把"任务依赖"这件事做到极致简单:**参数名就是依赖 ## 特性 - **零样板** —— 参数名即依赖,框架自动注入上游结果 -- **三种执行策略** —— `sequential`(调试)/ `thread`(I/O 密集同步)/ `async`(I/O 密集异步) +- **四种执行策略** —— `sequential`(调试)/ `thread`(I/O 密集同步)/ `async`(I/O 密集异步)/ `dependency`(依赖驱动,最大化并行) - **类型安全** —— `TaskSpec[T]` 把返回类型一路传到 `RunReport`,mypy strict 通过 - **DAG 校验** —— 构建时即时校验重名、缺失依赖、环 - **自动分层** —— Kahn 算法分组,同层任务可并行 -- **重试与超时** —— 每个任务独立配置 `retries` 与 `timeout` -- **断点续跑** —— `MemoryBackend` / `JSONBackend`,成功结果可缓存复用 +- **重试与超时** —— 每个任务独立配置 `RetryPolicy`(max_attempts/delay/backoff/jitter/retry_on)与 `timeout` +- **软依赖** —— `soft_depends_on` 仅用于上下文注入,不参与拓扑分层 +- **并发限制** —— `concurrency_key` + `concurrency_limits` 按组限流 +- **任务钩子** —— `TaskHooks`(pre_run/post_run/on_failure)生命周期回调 +- **断点续跑** —— `MemoryBackend` / `JSONBackend`,成功结果可缓存复用;`batch()` 批量落盘 +- **缓存键** —— `cache_key` 函数基于输入计算稳定键,使不同输入产生独立缓存 - **命令任务** —— `cmd` 参数直接执行外部命令,支持列表/shell/可调用对象 - **条件执行** —— `conditions` 参数按平台、环境变量、应用安装等条件跳过任务 +- **图组合** —— `compose` / `GraphComposer` 编程式展开多图字符串引用 +- **任务模板** —— `task_template` 工厂批量生成相似 TaskSpec +- **图级默认值** —— `GraphDefaults` 统一配置 retry/timeout/concurrency 等 - **CLI 运行器** —— `CliRunner` 把多个图映射为命令行子命令,替代 Makefile -- **可观测** —— `on_event` 回调、`dry_run` 预览、`verbose` 生命周期日志、Mermaid 可视化 +- **可观测** —— `on_event` 回调(RUNNING/SUCCESS/FAILED/SKIPPED)、`dry_run` 预览、`verbose` 生命周期日志、Mermaid 可视化 - **零运行时依赖** —— 仅依赖标准库(3.8 需 `graphlib_backport`) -- **95% 测试覆盖** —— 分支覆盖率>= 95% +- **97% 测试覆盖** —— 分支覆盖率 >= 95% ## 安装 @@ -67,23 +74,31 @@ print(report["double"]) # [2, 4, 6] ### TaskSpec —— 任务描述 -`TaskSpec` 是不可变的任务描述符,是唯一需要配置的东西: +`TaskSpec` 是不可变的任务描述符(`Generic[T]`,返回类型一路传到 `RunReport`),是唯一需要配置的东西: ```python px.TaskSpec( name="fetch_user", # 唯一标识 fn=fetch_user, # 同步或异步函数 cmd=["curl", "..."], # 或: 执行命令(覆盖 fn) - depends_on=("auth",), # 依赖的任务名 + depends_on=("auth",), # 硬依赖(参与拓扑分层) + soft_depends_on=("cache",), # 软依赖(仅注入,不参与分层) args=(uid,), # 静态位置参数(追加在注入参数后) kwargs={"timeout": 30}, # 静态关键字参数 - retries=3, # 失败重试次数(0 = 仅一次) + retry=px.RetryPolicy(max_attempts=3, delay=1.0, backoff=2.0), # 重试策略 timeout=30.0, # 超时秒数(None = 不限制) tags=("api", "user"), # 自由标签,用于子图过滤 conditions=(is_prod,), # 条件函数列表(全部为 True 才执行) + priority=10, # 同层内优先级(高优先执行,默认 0) + concurrency_key="db", # 并发分组键(配合 concurrency_limits 限流) + cache_key=lambda ctx: str(ctx.get("uid")), # 缓存键函数(不同输入独立缓存) + hooks=px.TaskHooks(pre_run=..., post_run=..., on_failure=...), # 生命周期钩子 cwd=Path("/tmp"), # 命令工作目录(仅 cmd 模式) + env={"DEBUG": "1"}, # 环境变量覆盖(fn 与 cmd 模式均生效) verbose=True, # 打印命令输出(仅 cmd 模式) skip_if_missing=True, # 命令不存在时自动跳过(仅 list[str] cmd) + allow_upstream_skip=False, # 上游 SKIPPED/FAILED 时是否仍执行 + continue_on_error=False, # 本任务失败是否不中断整体 ) ``` @@ -97,18 +112,54 @@ px.TaskSpec( ### Graph —— DAG 构建 ```python -graph = px.Graph.from_specs([...]) # 整批校验(推荐) +# 图级默认值:TaskSpec 字段为 None 时回退 +defaults = px.GraphDefaults(retry=px.RetryPolicy(max_attempts=2), timeout=60.0) + +graph = px.Graph.from_specs([...], defaults=defaults) # 整批校验(推荐) # 或增量构建 -graph = px.Graph() +graph = px.Graph(defaults=defaults) graph.add(px.TaskSpec("a", fn_a)) graph.add(px.TaskSpec("b", fn_b, ("a",))) graph.validate() # 显式校验(环检测) -graph.layers() # 拓扑分层 +graph.layers() # 拓扑分层(run() 入口已统一校验,直接调用需自行先 validate) graph.to_mermaid() # Mermaid 可视化 graph.describe() # 人类可读摘要 graph.subgraph(("api",)) # 按标签切片 graph.subgraph_by_names(("a", "b")) # 按名称切片 +graph.map("fetch", [1, 2, 3], lambda i: TaskSpec(f"fetch_{i}", ...)) # 批量 fan-out +``` + +### 图组合 —— compose + +`compose` / `GraphComposer` 把带字符串引用的多个图展开为纯 `Graph`: + +```python +graphs = { + "build": px.Graph.from_specs([px.TaskSpec("b", cmd=["echo", "b"])]), + "all": px.Graph.from_specs(["build", px.TaskSpec("t", cmd=["echo", "t"])]), +} +resolved = px.compose(graphs) # "all" 图中的 "build" 引用被展开 +``` + +引用格式:`"command_name"`(整个图)或 `"command_name.task_name"`(特定任务)。 +`CliRunner` 内部自动调用 `compose`。 + +### 任务模板 —— task_template + +`task_template` 工厂批量生成相似 TaskSpec: + +```python +fetch = px.task_template( + fn=fetch_url, + retry=px.RetryPolicy(max_attempts=5), + timeout=30.0, + tags=("api",), +) +graph = px.Graph.from_specs([ + fetch("users", url="https://api.example.com/users"), + fetch("posts", url="https://api.example.com/posts"), +]) ``` ### run —— 执行 @@ -116,12 +167,14 @@ graph.subgraph_by_names(("a", "b")) # 按名称切片 ```python report = px.run( graph, - strategy="async", # sequential | thread | async + strategy="async", # sequential | thread | async | dependency max_workers=8, # thread 策略的线程池大小 + concurrency_limits={"db": 2}, # 按 concurrency_key 限流 dry_run=False, # True = 仅打印计划 verbose=False, # True = 打印任务生命周期日志 - on_event=callback, # 状态转换回调 + on_event=callback, # 状态转换回调(RUNNING/SUCCESS/FAILED/SKIPPED) state=px.JSONBackend("state.json"), # 断点续跑后端 + continue_on_error=False, # True = 单任务失败不中断整体 ) ``` @@ -141,7 +194,7 @@ report.describe() # 人类可读报告 按顺序求值: 1. **标注为 `Context`** 的参数 → 接收完整上游结果映射 -2. **名称匹配依赖** 的参数 → 接收该依赖的结果 +2. **名称匹配依赖** 的参数 → 接收该依赖的结果(含软依赖,缺失时注入默认值) 3. **`**kwargs`** 参数 → 接收所有依赖结果(dict) 4. **`TaskSpec.args` / `kwargs`** → 为非依赖参数提供静态值 @@ -170,8 +223,11 @@ def fetch_user(uid: int) -> dict: # uid 来自 TaskSpec.args | `sequential` | 串行 | 调试、CPU 密集 | 直接调用 | 事件循环 | | `thread` | 线程池 | I/O 密集同步 | 线程池 | 不支持 | | `async` | 事件循环 | I/O 密集异步 | 卸载到线程池 | 事件循环 | +| `dependency` | 依赖驱动 | 最大化并行度 | 卸载到线程池 | 事件循环 | -所有策略都遵循 `retries`、`timeout`、上下文注入、状态后端,并发出 `TaskEvent`。 +所有策略都遵循 `RetryPolicy`、`timeout`、上下文注入、状态后端、`concurrency_limits`, +并发出 `TaskEvent`(RUNNING/SUCCESS/FAILED/SKIPPED)。`dependency` 策略无层屏障: +任务在其所有硬依赖完成后立即启动。 ## 命令任务 @@ -275,12 +331,25 @@ python examples/async_aggregation.py from pyflowx import JSONBackend # 第一次运行:成功结果写入 state.json -backend = JSONBackend("state.json") +backend = JSONBackend("state.json", ttl=3600) # ttl 秒数,过期条目自动忽略 report = px.run(graph, strategy="sequential", state=backend) -# 第二次运行:已缓存任务自动跳过 +# 第二次运行:已缓存任务自动跳过(状态为 SKIPPED) report = px.run(graph, strategy="sequential", state=backend) -# report.results 中缓存任务状态为 SKIPPED +``` + +`run()` 内部以 `backend.batch()` 包裹整个执行:所有 `save` 延迟到运行结束时统一落盘一次 +(`JSONBackend` 从 O(N²) 降为 O(N) 磁盘写入;`MemoryBackend` 为 no-op)。 + +**缓存键**:默认存储键为任务名。配置 `cache_key` 函数后,键为 `"name:cache_key_value"`, +使不同输入产生独立缓存条目: + +```python +px.TaskSpec( + "fetch_user", + fn=fetch_user, + cache_key=lambda ctx: str(ctx.get("uid")), # 不同 uid 独立缓存 +) ``` ## 错误处理 @@ -321,14 +390,52 @@ except px.PyFlowXError: PyFlowX 专注于**单机 DAG 调度**的极致简洁,适合 ETL、数据处理、CI 流水线等场景。 +## 高级特性 + +### 并发限制 + +按 `concurrency_key` 分组限流,避免压垮下游资源: + +```python +graph = px.Graph.from_specs([ + px.TaskSpec("q1", fn=query_db, concurrency_key="db"), + px.TaskSpec("q2", fn=query_db, concurrency_key="db"), + px.TaskSpec("q3", fn=query_db, concurrency_key="db"), +]) +# 同一时刻最多 2 个 "db" 组任务运行 +px.run(graph, strategy="async", concurrency_limits={"db": 2}) +``` + +### 任务钩子 + +`TaskHooks` 在任务生命周期触发(异常仅记录,不影响任务状态): + +```python +hooks = px.TaskHooks( + pre_run=lambda spec: print(f"start {spec.name}"), + post_run=lambda spec, value: print(f"done {spec.name}"), + on_failure=lambda spec, exc: alert(spec.name, exc), +) +px.TaskSpec("task", fn=work, hooks=hooks) +``` + +### 优先级 + +同层内按 `priority` 降序执行(稳定排序): + +```python +px.TaskSpec("low", fn=work, priority=0) +px.TaskSpec("high", fn=work, priority=10) # 同层内先执行 +``` + ## 开发 ```bash # 安装开发依赖 uv sync --extra dev -# 运行测试(含覆盖率) -uv run pytest --cov=pyflowx --cov-fail-under=100 +# 运行测试(含覆盖率,阈值 95%) +uv run pytest --cov=pyflowx --cov-fail-under=95 # 类型检查 uv run mypy @@ -338,6 +445,22 @@ uv run ruff check src tests examples uv run ruff format --check src tests examples ``` +## 模块结构 + +| 模块 | 职责 | +|------|------| +| `task.py` | 纯数据结构:`TaskSpec`、`RetryPolicy`、`TaskHooks`、`TaskStatus` | +| `graph.py` | DAG 构建、校验、分层、可视化 | +| `compose.py` | 多图组合:`GraphComposer` / `compose` | +| `context.py` | 上下文注入:参数名→依赖解析 | +| `command.py` | 命令执行:`run_command`(list/shell/Callable) | +| `conditions.py` | 条件执行:内置条件与组合器 | +| `executors.py` | 执行器与 `run` 入口:四种策略共享模块级辅助 | +| `storage.py` | 状态后端:`MemoryBackend` / `JSONBackend`(batch flush) | +| `runner.py` | CLI 运行器:`CliRunner` | +| `report.py` | 运行结果:`RunReport` / `TaskResult` | +| `errors.py` | 错误家族:`PyFlowXError` 子类 | + ## 许可证 MIT