zhou febcd90a31 refactor(graph,runner,test): 重构代码并清理冗余逻辑
1. 将Graph类改为frozen dataclass简化实现
2. 移除executors.py中的内置策略校验逻辑
3. 使用typing.get_args替代直接访问Strategy.__args__
4. 清理测试文件中冗余的无效参数测试用例
5. 统一替换测试中未使用的px.run调用返回值
6. 在pyproject.toml中添加pytest slow标记配置
2026-06-21 14:11:57 +08:00
2026-06-20 09:33:53 +08:00

PyFlowX

轻量、类型安全的 DAG 任务调度器。

CI PyPI Python Coverage License

PyFlowX 把"任务依赖"这件事做到极致简单:参数名就是依赖声明。无需装饰器、 无需样板包装器,写一个普通函数,框架按参数名自动注入上游结果。

特性

  • 零样板 —— 参数名即依赖,框架自动注入上游结果
  • 三种执行策略 —— sequential(调试)/ threadI/O 密集同步)/ asyncI/O 密集异步)
  • 类型安全 —— TaskSpec[T] 把返回类型一路传到 RunReportmypy strict 通过
  • DAG 校验 —— 构建时即时校验重名、缺失依赖、环
  • 自动分层 —— Kahn 算法分组,同层任务可并行
  • 重试与超时 —— 每个任务独立配置 retriestimeout
  • 断点续跑 —— MemoryBackend / JSONBackend,成功结果可缓存复用
  • 可观测 —— on_event 回调、dry_run 预览、Mermaid 可视化
  • 零运行时依赖 —— 仅依赖标准库(3.8 需 graphlib_backport
  • 100% 测试覆盖 —— 分支覆盖率达 100%

安装

pip install pyflowx

或使用 uv

uv add pyflowx

快速上手

import pyflowx as px

def extract() -> list[int]:
    return [1, 2, 3]

# 参数名 extract 自动匹配上游任务名 → 自动注入
def double(extract: list[int]) -> list[int]:
    return [x * 2 for x in extract]

graph = px.Graph.from_specs([
    px.TaskSpec("extract", extract),
    px.TaskSpec("double", double, ("extract",)),
])

report = px.run(graph, strategy="sequential")
print(report["double"])  # [2, 4, 6]

核心概念

TaskSpec —— 任务描述

TaskSpec 是不可变的任务描述符,是唯一需要配置的东西:

px.TaskSpec(
    name="fetch_user",           # 唯一标识
    fn=fetch_user,               # 同步或异步函数
    depends_on=("auth",),        # 依赖的任务名
    args=(uid,),                 # 静态位置参数(追加在注入参数后)
    kwargs={"timeout": 30},      # 静态关键字参数
    retries=3,                   # 失败重试次数(0 = 仅一次)
    timeout=30.0,                # 超时秒数(None = 不限制)
    tags=("api", "user"),        # 自由标签,用于子图过滤
)

Graph —— DAG 构建

graph = px.Graph.from_specs([...])   # 整批校验(推荐)
# 或增量构建
graph = px.Graph()
graph.add(px.TaskSpec("a", fn_a))
graph.add(px.TaskSpec("b", fn_b, ("a",)))

graph.validate()              # 显式校验(环检测)
graph.layers()                # 拓扑分层
graph.to_mermaid()            # Mermaid 可视化
graph.describe()              # 人类可读摘要
graph.subgraph(("api",))      # 按标签切片
graph.subgraph_by_names(("a", "b"))  # 按名称切片

run —— 执行

report = px.run(
    graph,
    strategy="async",          # sequential | thread | async
    max_workers=8,             # thread 策略的线程池大小
    dry_run=False,             # True = 仅打印计划
    on_event=callback,         # 状态转换回调
    state=px.JSONBackend("state.json"),  # 断点续跑后端
)

RunReport —— 结果

report["task_name"]            # 任务返回值
report.result_of("task_name")  # 完整 TaskResult
report.success                 # 整体是否成功
report.summary()               # 统计字典
report.failed_tasks()          # 失败任务名列表
report.describe()              # 人类可读报告

上下文注入规则

按顺序求值:

  1. 标注为 Context 的参数 → 接收完整上游结果映射
  2. 名称匹配依赖 的参数 → 接收该依赖的结果
  3. **kwargs 参数 → 接收所有依赖结果(dict)
  4. TaskSpec.args / kwargs → 为非依赖参数提供静态值
from typing import Any, Dict

def aggregate(ctx: px.Context) -> Dict[str, Any]:
    """ctx 包含所有 depends_on 任务的返回值。"""
    return dict(ctx)

def merge(fetch_a: str, fetch_b: str) -> str:
    """fetch_a / fetch_b 自动注入。"""
    return fetch_a + fetch_b

def fetch_user(uid: int) -> dict:  # uid 来自 TaskSpec.args
    ...

执行策略对比

策略 并发模型 适用场景 同步任务 异步任务
sequential 串行 调试、CPU 密集 直接调用 事件循环
thread 线程池 I/O 密集同步 线程池 不支持
async 事件循环 I/O 密集异步 卸载到线程池 事件循环

所有策略都遵循 retriestimeout、上下文注入、状态后端,并发出 TaskEvent

示例

仓库 examples/ 目录包含完整示例:

运行:

python examples/etl_pipeline.py
python examples/parallel_run.py
python examples/async_aggregation.py

断点续跑

from pyflowx import JSONBackend

# 第一次运行:成功结果写入 state.json
backend = JSONBackend("state.json")
report = px.run(graph, strategy="sequential", state=backend)

# 第二次运行:已缓存任务自动跳过
report = px.run(graph, strategy="sequential", state=backend)
# report.results 中缓存任务状态为 SKIPPED

错误处理

所有错误都是 PyFlowXError 的子类:

错误 触发时机
DuplicateTaskError 任务名重复注册
MissingDependencyError 依赖了不存在的任务
CycleError 依赖图存在环
TaskFailedError 任务耗尽重试后仍失败
TaskTimeoutError 任务超时
InjectionError 上下文注入无法满足签名
StorageError 状态后端持久化失败
try:
    report = px.run(graph, strategy="async")
except px.TaskFailedError as exc:
    print(f"{exc.task} 失败: {exc.cause}(尝试 {exc.attempts} 次)")
except px.PyFlowXError:
    # 捕获整个错误家族
    raise

与其他库对比

特性 PyFlowX Airflow Prefect Dask
零样板 参数名即依赖 装饰器 + XCom 装饰器 装饰器
运行时依赖 仅标准库 重型 中型 中型
类型安全 mypy strict
异步原生 部分
断点续跑 内置 需配置 需配置 需配置
学习曲线 极低
适用规模 单机 集群 单机/集群 集群

PyFlowX 专注于单机 DAG 调度的极致简洁,适合 ETL、数据处理、CI 流水线等场景。

开发

# 安装开发依赖
uv sync --extra dev

# 运行测试(含覆盖率)
uv run pytest --cov=pyflowx --cov-fail-under=100

# 类型检查
uv run mypy

# 代码风格
uv run ruff check src tests examples
uv run ruff format --check src tests examples

许可证

MIT

S
Description
No description provided
Readme MIT 1.2 MiB
Languages
Python 100%