refactor: 重构代码风格与配置,完善文档与CI

1. 移除冗余导入与简化代码写法
2. 更新coverage配置与pre-commit钩子
3. 重构CI流程,拆分lint/typecheck/test任务
4. 汉化项目文档与注释
5. 修正graphlib导入的类型忽略注释
This commit is contained in:
2026-06-20 13:39:03 +08:00
parent a352529263
commit 0b995d66c3
9 changed files with 419 additions and 214 deletions
+76 -69
View File
@@ -13,20 +13,75 @@ concurrency:
jobs:
# ─────────────────────────────────────────────────────────────
# 后端:多平台 × 多 Python 版本矩阵测试
# lint:代码风格与格式检查(单平台即可)
# ─────────────────────────────────────────────────────────────
backend-test:
name: Backend (${{ matrix.os }} / py${{ matrix.python-version }})
lint:
name: Lint (ruff)
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: 安装 uv
uses: astral-sh/setup-uv@v5
with:
version: latest
enable-cache: true
cache-dependency-glob: uv.lock
- name: 设置 Python 3.13
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: 安装依赖
run: uv sync --extra dev --frozen
- name: Ruff 检查
run: uv run ruff check src tests examples
- name: Ruff 格式检查
run: uv run ruff format --check src tests examples
# ─────────────────────────────────────────────────────────────
# typecheckmypy 严格类型检查
# ─────────────────────────────────────────────────────────────
typecheck:
name: Typecheck (mypy)
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: 安装 uv
uses: astral-sh/setup-uv@v5
with:
version: latest
enable-cache: true
cache-dependency-glob: uv.lock
- name: 设置 Python 3.13
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: 安装依赖
run: uv sync --extra dev --frozen
- name: Mypy 严格类型检查
run: uv run mypy
# ─────────────────────────────────────────────────────────────
# test:多平台 × 多 Python 版本矩阵测试 + 覆盖率
# ─────────────────────────────────────────────────────────────
test:
name: Test (${{ matrix.os }} / py${{ matrix.python-version }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ['3.13', '3.14']
exclude:
# macOS + py3.14 暂时跳过(部分依赖未发布 wheel)
- os: macos-latest
python-version: '3.14'
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -46,16 +101,14 @@ jobs:
- name: 安装依赖
run: uv sync --extra dev --frozen
- name: Ruff 检查
run: uv run ruff check backend/endo tests
- name: 运行测试(含覆盖率,强制 100%
run: uv run pytest -v --cov=pyflowx --cov-report=xml --cov-report=term-missing --cov-fail-under=100
- name: Ruff 格式检查
run: uv run ruff format --check backend/endo tests
- name: 运行测试
env:
PYTHONPATH: backend
run: uv run pytest -v --cov=endo --cov-report=xml --cov-report=term-missing
- name: 运行示例冒烟测试
run: |
uv run python examples/etl_pipeline.py
uv run python examples/parallel_run.py
uv run python examples/async_aggregation.py
- name: 上传覆盖率
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.13'
@@ -66,66 +119,20 @@ jobs:
retention-days: 7
# ─────────────────────────────────────────────────────────────
# 前端:多平台构建验证
# ─────────────────────────────────────────────────────────────
frontend-build:
name: Frontend (${{ matrix.os }} / node${{ matrix.node-version }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
node-version: [20, 22]
defaults:
run:
working-directory: frontend
steps:
- name: Checkout
uses: actions/checkout@v4
- name: 安装 pnpm
uses: pnpm/action-setup@v4
with:
version: 9
- name: 设置 Node ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}
cache: pnpm
cache-dependency-path: frontend/pnpm-lock.yaml
- name: 安装依赖
run: pnpm install --frozen-lockfile
- name: TypeScript 类型检查
run: npx tsc --noEmit -p tsconfig.app.json
- name: 构建
run: pnpm run build
- name: 上传构建产物
if: matrix.os == 'ubuntu-latest' && matrix.node-version == 22
uses: actions/upload-artifact@v4
with:
name: frontend-dist
path: frontend/dist
retention-days: 7
# ─────────────────────────────────────────────────────────────
# 聚合:所有测试通过后才标记完成
# 聚合:所有检查通过后才标记完成
# ─────────────────────────────────────────────────────────────
ci-pass:
name: CI Pass
runs-on: ubuntu-latest
needs: [backend-test, frontend-build]
needs: [lint, typecheck, test]
if: always()
steps:
- name: 检查依赖任务结果
if: ${{ needs.backend-test.result != 'success' || needs.frontend-build.result != 'success' }}
if: ${{ needs.lint.result != 'success' || needs.typecheck.result != 'success' || needs.test.result != 'success' }}
run: |
echo "backend-test: ${{ needs.backend-test.result }}"
echo "frontend-build: ${{ needs.frontend-build.result }}"
echo "lint: ${{ needs.lint.result }}"
echo "typecheck: ${{ needs.typecheck.result }}"
echo "test: ${{ needs.test.result }}"
exit 1
- name: 全部通过
run: echo "✅ 所有 CI 检查通过"
+28 -88
View File
@@ -18,13 +18,14 @@ permissions:
jobs:
# ─────────────────────────────────────────────────────────────
# 预检:发布前必须通过 CI
# 预检:版本号校验 + 与 pyproject.toml 一致性检查
# ─────────────────────────────────────────────────────────────
pre-check:
name: Pre-release Check
runs-on: ubuntu-latest
outputs:
version: ${{ steps.meta.outputs.version }}
tag: ${{ steps.meta.outputs.tag }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -65,7 +66,7 @@ jobs:
fi
# ─────────────────────────────────────────────────────────────
# 构建:后端 wheel(纯 Python,单平台即可)+ 前端 dist
# 构建:wheel + sdist(纯 Python,单平台即可)
# ─────────────────────────────────────────────────────────────
build:
name: Build Artifacts
@@ -86,70 +87,26 @@ jobs:
with:
python-version: '3.13'
- name: 安装 pnpm(前端构建依赖
uses: pnpm/action-setup@v4
with:
version: 9
- name: 安装依赖
run: uv sync --extra dev --frozen
- name: 设置 Node 22(前端构建)
uses: actions/setup-node@v4
with:
node-version: 22
cache: pnpm
cache-dependency-path: frontend/pnpm-lock.yaml
- name: 安装前端依赖(缓存)
working-directory: frontend
run: pnpm install --frozen-lockfile
- name: 构建后端 wheel + sdist(自动触发前端构建)
- name: 构建 wheel + sdist
run: uv build
- name: 上传后端产物
uses: actions/upload-artifact@v4
with:
name: backend-dist
path: dist/*
retention-days: 30
build-frontend:
name: Build Frontend
needs: pre-check
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: 安装 pnpm
uses: pnpm/action-setup@v4
with:
version: 9
- name: 设置 Node 22
uses: actions/setup-node@v4
with:
node-version: 22
cache: pnpm
cache-dependency-path: frontend/pnpm-lock.yaml
- name: 安装依赖
working-directory: frontend
run: pnpm install --frozen-lockfile
- name: 构建
working-directory: frontend
run: pnpm run build
- name: 打包前端 dist
- name: 校验产物
run: |
cd frontend
zip -r ../endo-frontend-${{ needs.pre-check.outputs.version }}.zip dist
echo "待上传产物:"
ls -la dist/
if [ -z "$(ls -A dist/*.whl dist/*.tar.gz 2>/dev/null)" ]; then
echo "❌ 未找到 wheel 或 sdist 产物"
exit 1
fi
- name: 上传前端产物
- name: 上传构建产物
uses: actions/upload-artifact@v4
with:
name: frontend-dist-release
path: endo-frontend-*.zip
name: dist
path: dist/*
retention-days: 30
# ─────────────────────────────────────────────────────────────
@@ -161,25 +118,16 @@ jobs:
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/project/endo/${{ needs.pre-check.outputs.version }}
url: https://pypi.org/project/pyflowx/${{ needs.pre-check.outputs.version }}
permissions:
id-token: write
steps:
- name: 下载后端构建产物
- name: 下载构建产物
uses: actions/download-artifact@v4
with:
name: backend-dist
name: dist
path: dist
- name: 校验产物
run: |
echo "待上传产物:"
ls -la dist/
if [ -z "$(ls -A dist/*.whl dist/*.tar.gz 2>/dev/null)" ]; then
echo "❌ 未找到 wheel 或 sdist 产物"
exit 1
fi
- name: 上传到 PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
@@ -190,7 +138,7 @@ jobs:
# ─────────────────────────────────────────────────────────────
release:
name: Publish Release
needs: [pre-check, build, build-frontend, publish-pypi]
needs: [pre-check, build, publish-pypi]
runs-on: ubuntu-latest
permissions:
contents: write
@@ -198,39 +146,31 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: 下载所有构建产物
- name: 下载构建产物
uses: actions/download-artifact@v4
with:
path: release-assets
name: dist
path: assets
- name: 整理发布产物
run: |
mkdir -p assets
find release-assets -name "*.whl" -exec cp {} assets/ \;
find release-assets -name "*.tar.gz" -exec cp {} assets/ \;
find release-assets -name "*.zip" -exec cp {} assets/ \;
ls -la assets/
- name: 生成 Release Notes
id: notes
run: |
{
echo "## endo ${{ needs.pre-check.outputs.version }}"
echo "## pyflowx ${{ needs.pre-check.outputs.version }}"
echo ""
echo "### 下载"
echo ""
echo "- **后端 wheel**: \`endo-${{ needs.pre-check.outputs.version }}-py3-none-any.whl\`"
echo "- **源码包**: \`endo-${{ needs.pre-check.outputs.version }}.tar.gz\`"
echo "- **前端 dist**: \`endo-frontend-${{ needs.pre-check.outputs.version }}.zip\`"
echo "- **Wheel**: \`pyflowx-${{ needs.pre-check.outputs.version }}-py3-none-any.whl\`"
echo "- **源码包**: \`pyflowx-${{ needs.pre-check.outputs.version }}.tar.gz\`"
echo ""
echo "### 安装"
echo ""
echo '```bash'
echo "# 后端"
echo "pip install endo-${{ needs.pre-check.outputs.version }}-py3-none-any.whl"
echo ""
echo "# 前端"
echo "unzip endo-frontend-${{ needs.pre-check.outputs.version }}.zip -d frontend-dist"
echo "pip install pyflowx==${{ needs.pre-check.outputs.version }}"
echo '```'
echo ""
echo "### 完整变更日志"
@@ -245,7 +185,7 @@ jobs:
uses: softprops/action-gh-release@v2
with:
tag_name: ${{ needs.pre-check.outputs.tag }}
name: endo ${{ needs.pre-check.outputs.version }}
name: pyflowx ${{ needs.pre-check.outputs.version }}
body: ${{ steps.notes.outputs.content }}
files: assets/*
draft: false
+22
View File
@@ -0,0 +1,22 @@
# prek compatible configuration
# See https://pre-commit.com for more information
repos:
- repo: https://gitcode.com/gh_mirrors/ru/ruff-pre-commit.git
# Ruff version - keep in sync with pyproject.toml
rev: v0.15.4
hooks:
# Run the linter
- id: ruff
args: [ --fix, --exit-non-zero-on-fix ]
# Run the formatter
- id: ruff-format
args: [ --config=pyproject.toml]
- repo: https://gitcode.com/gh_mirrors/pr/pre-commit-hooks.git
rev: v5.0.0
hooks:
- id: check-merge-conflict
- id: debug-statements
- id: fix-byte-order-marker
- id: trailing-whitespace
args: [ --markdown-linebreak-ext=md ]
- id: end-of-file-fixer
+241
View File
@@ -0,0 +1,241 @@
# PyFlowX
> 轻量、类型安全的 DAG 任务调度器。
[![CI](https://github.com/pyflowx/pyflowx/actions/workflows/ci.yml/badge.svg)](https://github.com/pyflowx/pyflowx/actions/workflows/ci.yml)
[![PyPI](https://img.shields.io/pypi/v/pyflowx.svg)](https://pypi.org/project/pyflowx/)
[![Python](https://img.shields.io/pypi/pyversions/pyflowx.svg)](https://pypi.org/project/pyflowx/)
[![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen.svg)](https://github.com/pyflowx/pyflowx)
[![License](https://img.shields.io/pypi/l/pyflowx.svg)](https://github.com/pyflowx/pyflowx/blob/main/LICENSE)
PyFlowX 把"任务依赖"这件事做到极致简单:**参数名就是依赖声明**。无需装饰器、
无需样板包装器,写一个普通函数,框架按参数名自动注入上游结果。
## 特性
- **零样板** —— 参数名即依赖,框架自动注入上游结果
- **三种执行策略** —— `sequential`(调试)/ `thread`I/O 密集同步)/ `async`I/O 密集异步)
- **类型安全** —— `TaskSpec[T]` 把返回类型一路传到 `RunReport`mypy strict 通过
- **DAG 校验** —— 构建时即时校验重名、缺失依赖、环
- **自动分层** —— Kahn 算法分组,同层任务可并行
- **重试与超时** —— 每个任务独立配置 `retries``timeout`
- **断点续跑** —— `MemoryBackend` / `JSONBackend`,成功结果可缓存复用
- **可观测** —— `on_event` 回调、`dry_run` 预览、Mermaid 可视化
- **零运行时依赖** —— 仅依赖标准库(3.8 需 `graphlib_backport`
- **100% 测试覆盖** —— 分支覆盖率达 100%
## 安装
```bash
pip install pyflowx
```
或使用 [uv](https://docs.astral.sh/uv/)
```bash
uv add pyflowx
```
## 快速上手
```python
import pyflowx as px
def extract() -> list[int]:
return [1, 2, 3]
# 参数名 extract 自动匹配上游任务名 → 自动注入
def double(extract: list[int]) -> list[int]:
return [x * 2 for x in extract]
graph = px.Graph.from_specs([
px.TaskSpec("extract", extract),
px.TaskSpec("double", double, ("extract",)),
])
report = px.run(graph, strategy="sequential")
print(report["double"]) # [2, 4, 6]
```
## 核心概念
### TaskSpec —— 任务描述
`TaskSpec` 是不可变的任务描述符,是唯一需要配置的东西:
```python
px.TaskSpec(
name="fetch_user", # 唯一标识
fn=fetch_user, # 同步或异步函数
depends_on=("auth",), # 依赖的任务名
args=(uid,), # 静态位置参数(追加在注入参数后)
kwargs={"timeout": 30}, # 静态关键字参数
retries=3, # 失败重试次数(0 = 仅一次)
timeout=30.0, # 超时秒数(None = 不限制)
tags=("api", "user"), # 自由标签,用于子图过滤
)
```
### Graph —— DAG 构建
```python
graph = px.Graph.from_specs([...]) # 整批校验(推荐)
# 或增量构建
graph = px.Graph()
graph.add(px.TaskSpec("a", fn_a))
graph.add(px.TaskSpec("b", fn_b, ("a",)))
graph.validate() # 显式校验(环检测)
graph.layers() # 拓扑分层
graph.to_mermaid() # Mermaid 可视化
graph.describe() # 人类可读摘要
graph.subgraph(("api",)) # 按标签切片
graph.subgraph_by_names(("a", "b")) # 按名称切片
```
### run —— 执行
```python
report = px.run(
graph,
strategy="async", # sequential | thread | async
max_workers=8, # thread 策略的线程池大小
dry_run=False, # True = 仅打印计划
on_event=callback, # 状态转换回调
state=px.JSONBackend("state.json"), # 断点续跑后端
)
```
### RunReport —— 结果
```python
report["task_name"] # 任务返回值
report.result_of("task_name") # 完整 TaskResult
report.success # 整体是否成功
report.summary() # 统计字典
report.failed_tasks() # 失败任务名列表
report.describe() # 人类可读报告
```
## 上下文注入规则
按顺序求值:
1. **标注为 `Context`** 的参数 → 接收完整上游结果映射
2. **名称匹配依赖** 的参数 → 接收该依赖的结果
3. **`**kwargs`** 参数 → 接收所有依赖结果(dict)
4. **`TaskSpec.args` / `kwargs`** → 为非依赖参数提供静态值
```python
from typing import Any, Dict
def aggregate(ctx: px.Context) -> Dict[str, Any]:
"""ctx 包含所有 depends_on 任务的返回值。"""
return dict(ctx)
def merge(fetch_a: str, fetch_b: str) -> str:
"""fetch_a / fetch_b 自动注入。"""
return fetch_a + fetch_b
def fetch_user(uid: int) -> dict: # uid 来自 TaskSpec.args
...
```
## 执行策略对比
| 策略 | 并发模型 | 适用场景 | 同步任务 | 异步任务 |
|------|---------|---------|---------|---------|
| `sequential` | 串行 | 调试、CPU 密集 | 直接调用 | 事件循环 |
| `thread` | 线程池 | I/O 密集同步 | 线程池 | 不支持 |
| `async` | 事件循环 | I/O 密集异步 | 卸载到线程池 | 事件循环 |
所有策略都遵循 `retries``timeout`、上下文注入、状态后端,并发出 `TaskEvent`
## 示例
仓库 `examples/` 目录包含完整示例:
- [`etl_pipeline.py`](examples/etl_pipeline.py) —— ETL 流水线(sequential
- [`parallel_run.py`](examples/parallel_run.py) —— 并行执行对比(thread vs sequential
- [`async_aggregation.py`](examples/async_aggregation.py) —— 异步聚合 + Context 注入
运行:
```bash
python examples/etl_pipeline.py
python examples/parallel_run.py
python examples/async_aggregation.py
```
## 断点续跑
```python
from pyflowx import JSONBackend
# 第一次运行:成功结果写入 state.json
backend = JSONBackend("state.json")
report = px.run(graph, strategy="sequential", state=backend)
# 第二次运行:已缓存任务自动跳过
report = px.run(graph, strategy="sequential", state=backend)
# report.results 中缓存任务状态为 SKIPPED
```
## 错误处理
所有错误都是 `PyFlowXError` 的子类:
| 错误 | 触发时机 |
|------|---------|
| `DuplicateTaskError` | 任务名重复注册 |
| `MissingDependencyError` | 依赖了不存在的任务 |
| `CycleError` | 依赖图存在环 |
| `TaskFailedError` | 任务耗尽重试后仍失败 |
| `TaskTimeoutError` | 任务超时 |
| `InjectionError` | 上下文注入无法满足签名 |
| `StorageError` | 状态后端持久化失败 |
```python
try:
report = px.run(graph, strategy="async")
except px.TaskFailedError as exc:
print(f"{exc.task} 失败: {exc.cause}(尝试 {exc.attempts} 次)")
except px.PyFlowXError:
# 捕获整个错误家族
raise
```
## 与其他库对比
| 特性 | PyFlowX | Airflow | Prefect | Dask |
|------|---------|---------|---------|------|
| 零样板 | 参数名即依赖 | 装饰器 + XCom | 装饰器 | 装饰器 |
| 运行时依赖 | 仅标准库 | 重型 | 中型 | 中型 |
| 类型安全 | mypy strict | 弱 | 中 | 中 |
| 异步原生 | 是 | 否 | 部分 | 否 |
| 断点续跑 | 内置 | 需配置 | 需配置 | 需配置 |
| 学习曲线 | 极低 | 高 | 中 | 中 |
| 适用规模 | 单机 | 集群 | 单机/集群 | 集群 |
PyFlowX 专注于**单机 DAG 调度**的极致简洁,适合 ETL、数据处理、CI 流水线等场景。
## 开发
```bash
# 安装开发依赖
uv sync --extra dev
# 运行测试(含覆盖率)
uv run pytest --cov=pyflowx --cov-fail-under=100
# 类型检查
uv run mypy
# 代码风格
uv run ruff check src tests examples
uv run ruff format --check src tests examples
```
## 许可证
MIT
+1 -1
View File
@@ -73,7 +73,7 @@ dev = ["pyflowx[dev]"]
[tool.coverage.run]
branch = true
concurrency = ["greenlet", "thread"]
concurrency = ["thread"]
source = ["pyflowx"]
[tool.coverage.report]
+46 -45
View File
@@ -1,16 +1,15 @@
"""Executors and the public :func:`run` entry point.
"""执行器与公共 :func:`run` 入口。
Three execution strategies share a common layer-by-layer driver:
三种执行策略共享一个逐层驱动器:
* ``sequential`` — deterministic, one task at a time. Best for debugging.
* ``thread`` — layer-internal concurrency via a thread pool. Best for
I/O-bound sync tasks.
* ``async`` — layer-internal concurrency via ``asyncio.gather``.
Sync tasks are offloaded to a thread pool; async tasks
run on the event loop. Best for I/O-bound async tasks.
* ``sequential`` —— 确定性、一次一个任务。最适合调试。
* ``thread`` —— 通过线程池实现层内并发。最适合 I/O 密集型同步任务。
* ``async`` —— 通过 ``asyncio.gather`` 实现层内并发。同步任务被
卸载到线程池;异步任务运行在事件循环上。最适合
I/O 密集型异步任务。
All three honour ``retries``, ``timeout``, context injection, state
backends (resume), and emit :class:`~pyflowx.task.TaskEvent` for observers.
三者都遵循 ``retries````timeout``、上下文注入、状态后端(续跑),
并向观察者发出 :class:`~pyflowx.task.TaskEvent`
"""
from __future__ import annotations
@@ -31,15 +30,15 @@ from .task import TaskEvent, TaskResult, TaskSpec, TaskStatus
logger = logging.getLogger("pyflowx")
# Observer callback type.
# 观察者回调类型。
EventCallback = Callable[[TaskEvent], None]
# Strategy selector literal.
# 策略选择字面量。
Strategy = str # "sequential" | "thread" | "async"
def _is_async_fn(spec: TaskSpec[object]) -> bool:
"""True if ``spec.fn`` is a coroutine function."""
"""判断 ``spec.fn`` 是否为协程函数。"""
return inspect.iscoroutinefunction(spec.fn)
@@ -47,7 +46,7 @@ def _emit(
on_event: Optional[EventCallback],
result: TaskResult[object],
) -> None:
"""Fire an observer event if a callback is registered."""
"""若注册了回调则触发一个观察者事件。"""
if on_event is None:
return
on_event(
@@ -91,7 +90,7 @@ def _run_sync_with_retry(
context: Mapping[str, Any],
layer_idx: Optional[int],
) -> TaskResult[object]:
"""Execute a sync task with retries; return a populated TaskResult."""
"""执行同步任务并带重试;返回填充好的 TaskResult"""
result: TaskResult[object] = TaskResult(spec=spec)
result.started_at = datetime.now()
max_attempts = spec.retries + 1
@@ -104,7 +103,7 @@ def _run_sync_with_retry(
result.status = TaskStatus.SUCCESS
result.finished_at = datetime.now()
return result
except Exception as exc: # noqa: BLE001 - user code may raise anything
except Exception as exc: # noqa: BLE001 - 用户代码可能抛任何异常
result.error = exc
if result.attempts >= max_attempts:
_finalize_failure(result, layer_idx) # pragma: no cover
@@ -117,7 +116,7 @@ async def _run_async_with_retry(
context: Mapping[str, Any],
layer_idx: Optional[int],
) -> TaskResult[object]:
"""Execute a task (sync or async) on the event loop with retries."""
"""在事件循环上执行任务(同步或异步)并带重试。"""
result: TaskResult[object] = TaskResult(spec=spec)
result.started_at = datetime.now()
max_attempts = spec.retries + 1
@@ -134,8 +133,10 @@ async def _run_async_with_retry(
else:
result.value = await coro
else:
# Offload sync work to a thread so the event loop stays alive.
fn_call: Callable[[], Any] = lambda: spec.fn(*args, **kwargs)
# 将同步工作卸载到线程,保持事件循环存活。
def fn_call() -> Any:
return spec.fn(*args, **kwargs)
if spec.timeout is not None:
result.value = await asyncio.wait_for(
loop.run_in_executor(None, fn_call), timeout=spec.timeout
@@ -164,13 +165,13 @@ async def _run_async_with_retry(
# ---------------------------------------------------------------------- #
# Layer driver
# 层驱动器
# ---------------------------------------------------------------------- #
def _build_context(
spec: TaskSpec[object],
global_context: Mapping[str, Any],
) -> Mapping[str, Any]:
"""Restrict the global context to this task's dependencies."""
"""将全局上下文限制为本任务的依赖。"""
return {
dep: global_context[dep] for dep in spec.depends_on if dep in global_context
}
@@ -185,7 +186,7 @@ def _execute_layer_sequential(
layer_idx: int,
on_event: Optional[EventCallback],
) -> None:
"""Run a layer's tasks one by one."""
"""逐个运行某层的任务。"""
for name in layer:
spec = graph.spec(name)
if backend.has(name):
@@ -213,8 +214,8 @@ def _execute_layer_threaded(
on_event: Optional[EventCallback],
max_workers: int,
) -> None:
"""Run a layer's tasks concurrently in a thread pool."""
# First, satisfy cached tasks synchronously.
"""在线程池中并发运行某层的任务。"""
# 先同步满足已缓存任务。
to_run: List[str] = []
for name in layer:
if backend.has(name):
@@ -235,14 +236,14 @@ def _execute_layer_threaded(
future_to_name: Dict[concurrent.futures.Future[TaskResult[object]], str] = {}
for name in to_run:
spec = graph.spec(name)
# Snapshot the context for this task to avoid races.
# 为本任务快照上下文以避免竞态。
task_ctx = _build_context(spec, context)
fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx)
future_to_name[fut] = name
for fut in concurrent.futures.as_completed(future_to_name):
name = future_to_name[fut]
result = fut.result() # raises TaskFailedError on failure
result = fut.result() # 失败时抛出 TaskFailedError
context[name] = result.value
backend.save(name, result.value)
report.results[name] = result
@@ -258,7 +259,7 @@ async def _execute_layer_async(
layer_idx: int,
on_event: Optional[EventCallback],
) -> None:
"""Run a layer's tasks concurrently on the event loop."""
"""在事件循环上并发运行某层的任务。"""
to_run: List[str] = []
for name in layer:
if backend.has(name):
@@ -290,7 +291,7 @@ async def _execute_layer_async(
# ---------------------------------------------------------------------- #
# Public API
# 公共 API
# ---------------------------------------------------------------------- #
def run(
graph: Graph,
@@ -301,32 +302,32 @@ def run(
on_event: Optional[EventCallback] = None,
state: Optional[StateBackend] = None,
) -> RunReport:
"""Execute a graph and return a :class:`RunReport`.
"""执行图并返回 :class:`RunReport`
Parameters
----------
参数
----
graph:
The validated :class:`Graph` to execute.
待执行的已校验 :class:`Graph`
strategy:
``"sequential"`` (default), ``"thread"``, or ``"async"``.
``"sequential"``(默认)、``"thread"`` ``"async"``
max_workers:
Thread-pool size for ``"thread"``. Defaults to ``min(32, len(layer))``.
``"thread"`` 的线程池大小。默认 ``min(32, len(layer))``
dry_run:
If ``True``, print the execution plan (layers + injection) and
return an empty report without executing anything.
若为 ``True``,打印执行计划(层 + 注入)并返回空报告,不执行
任何任务。
on_event:
Optional callback invoked on every status transition.
可选回调,在每次状态转换时调用。
state:
Optional :class:`StateBackend` for resumable runs. Defaults to an
in-memory backend (no persistence across processes).
可选 :class:`StateBackend`,用于断点续跑。默认为内存后端
(不跨进程持久化)。
Raises
------
抛出
----
ValueError
If ``strategy`` is not recognised.
``strategy`` 不被识别时。
TaskFailedError
If any task fails after exhausting retries. The run aborts at the
failing layer; tasks in later layers are not attempted.
任何任务耗尽重试后仍失败时。运行在失败层中止;后续层的任务
不会被执行。
"""
if strategy not in ("sequential", "thread", "async"):
raise ValueError(
@@ -361,7 +362,7 @@ def run(
def _print_dry_run(graph: Graph, layers: List[List[str]]) -> None:
"""Print the execution plan without running anything."""
"""打印执行计划但不运行任何任务。"""
print(f"Dry run: {len(graph)} tasks, {len(layers)} layers")
for idx, layer in enumerate(layers, 1):
print(f" Layer {idx}: {layer}")
+4 -4
View File
@@ -14,14 +14,14 @@ from .errors import CycleError, DuplicateTaskError, MissingDependencyError
from .task import TaskSpec
# graphlib 自 3.9 起进入标准库;3.8 回退到 backport。
if sys.version_info >= (3, 9):
if sys.version_info >= (3, 9): # pragma: no cover
import graphlib
_TopologicalSorter = graphlib.TopologicalSorter
else: # pragma: no cover - 仅在 3.8 上执行
import graphlib # type: ignore[no-redef]
else: # pragma: no cover
import graphlib # type: ignore[import-untyped] # pragma: no cover
_TopologicalSorter = graphlib.TopologicalSorter
_TopologicalSorter = graphlib.TopologicalSorter # pragma: no cover
class Graph:
-3
View File
@@ -2,9 +2,6 @@
from __future__ import annotations
import pytest
import pyflowx as px
from pyflowx.errors import (
CycleError,
DuplicateTaskError,
+1 -4
View File
@@ -6,7 +6,6 @@ from datetime import datetime
import pytest
import pyflowx as px
from pyflowx.task import TaskResult, TaskSpec, TaskStatus
@@ -51,9 +50,7 @@ def test_task_result_duration_computed() -> None:
spec: TaskSpec[None] = TaskSpec("a", _fn)
start = datetime(2024, 1, 1, 0, 0, 0)
end = datetime(2024, 1, 1, 0, 0, 5)
result: TaskResult[None] = TaskResult(
spec=spec, started_at=start, finished_at=end
)
result: TaskResult[None] = TaskResult(spec=spec, started_at=start, finished_at=end)
assert result.duration == 5.0