From 0b995d66c38540717aec158f9fd9334b61e66968 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sat, 20 Jun 2026 13:39:03 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E9=A3=8E=E6=A0=BC=E4=B8=8E=E9=85=8D=E7=BD=AE=EF=BC=8C?= =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=96=87=E6=A1=A3=E4=B8=8ECI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 移除冗余导入与简化代码写法 2. 更新coverage配置与pre-commit钩子 3. 重构CI流程,拆分lint/typecheck/test任务 4. 汉化项目文档与注释 5. 修正graphlib导入的类型忽略注释 --- .github/workflows/ci.yml | 145 ++++++++++---------- .github/workflows/release.yml | 116 ++++------------ .pre-commit-config.yaml | 22 ++++ README.md | 241 ++++++++++++++++++++++++++++++++++ pyproject.toml | 2 +- src/pyflowx/executors.py | 91 ++++++------- src/pyflowx/graph.py | 8 +- tests/test_errors.py | 3 - tests/test_task.py | 5 +- 9 files changed, 419 insertions(+), 214 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 485a295..26c5105 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,20 +13,75 @@ concurrency: jobs: # ───────────────────────────────────────────────────────────── - # 后端:多平台 × 多 Python 版本矩阵测试 + # lint:代码风格与格式检查(单平台即可) # ───────────────────────────────────────────────────────────── - backend-test: - name: Backend (${{ matrix.os }} / py${{ matrix.python-version }}) + lint: + name: Lint (ruff) + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: 安装 uv + uses: astral-sh/setup-uv@v5 + with: + version: latest + enable-cache: true + cache-dependency-glob: uv.lock + + - name: 设置 Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: 安装依赖 + run: uv sync --extra dev --frozen + + - name: Ruff 检查 + run: uv run ruff check src tests examples + + - name: Ruff 格式检查 + run: uv run ruff format --check src tests examples + + # ───────────────────────────────────────────────────────────── + # typecheck:mypy 严格类型检查 + # ───────────────────────────────────────────────────────────── + typecheck: + name: Typecheck (mypy) + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: 安装 uv + uses: astral-sh/setup-uv@v5 + with: + version: latest + enable-cache: true + cache-dependency-glob: uv.lock + + - name: 设置 Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: 安装依赖 + run: uv sync --extra dev --frozen + + - name: Mypy 严格类型检查 + run: uv run mypy + + # ───────────────────────────────────────────────────────────── + # test:多平台 × 多 Python 版本矩阵测试 + 覆盖率 + # ───────────────────────────────────────────────────────────── + test: + name: Test (${{ matrix.os }} / py${{ matrix.python-version }}) runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ['3.13', '3.14'] - exclude: - # macOS + py3.14 暂时跳过(部分依赖未发布 wheel) - - os: macos-latest - python-version: '3.14' + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] steps: - name: Checkout uses: actions/checkout@v4 @@ -46,16 +101,14 @@ jobs: - name: 安装依赖 run: uv sync --extra dev --frozen - - name: Ruff 检查 - run: uv run ruff check backend/endo tests + - name: 运行测试(含覆盖率,强制 100%) + run: uv run pytest -v --cov=pyflowx --cov-report=xml --cov-report=term-missing --cov-fail-under=100 - - name: Ruff 格式检查 - run: uv run ruff format --check backend/endo tests - - - name: 运行测试 - env: - PYTHONPATH: backend - run: uv run pytest -v --cov=endo --cov-report=xml --cov-report=term-missing + - name: 运行示例冒烟测试 + run: | + uv run python examples/etl_pipeline.py + uv run python examples/parallel_run.py + uv run python examples/async_aggregation.py - name: 上传覆盖率 if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.13' @@ -66,66 +119,20 @@ jobs: retention-days: 7 # ───────────────────────────────────────────────────────────── - # 前端:多平台构建验证 - # ───────────────────────────────────────────────────────────── - frontend-build: - name: Frontend (${{ matrix.os }} / node${{ matrix.node-version }}) - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - node-version: [20, 22] - defaults: - run: - working-directory: frontend - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: 安装 pnpm - uses: pnpm/action-setup@v4 - with: - version: 9 - - - name: 设置 Node ${{ matrix.node-version }} - uses: actions/setup-node@v4 - with: - node-version: ${{ matrix.node-version }} - cache: pnpm - cache-dependency-path: frontend/pnpm-lock.yaml - - - name: 安装依赖 - run: pnpm install --frozen-lockfile - - - name: TypeScript 类型检查 - run: npx tsc --noEmit -p tsconfig.app.json - - - name: 构建 - run: pnpm run build - - - name: 上传构建产物 - if: matrix.os == 'ubuntu-latest' && matrix.node-version == 22 - uses: actions/upload-artifact@v4 - with: - name: frontend-dist - path: frontend/dist - retention-days: 7 - - # ───────────────────────────────────────────────────────────── - # 聚合:所有测试通过后才标记完成 + # 聚合:所有检查通过后才标记完成 # ───────────────────────────────────────────────────────────── ci-pass: name: CI Pass runs-on: ubuntu-latest - needs: [backend-test, frontend-build] + needs: [lint, typecheck, test] if: always() steps: - name: 检查依赖任务结果 - if: ${{ needs.backend-test.result != 'success' || needs.frontend-build.result != 'success' }} + if: ${{ needs.lint.result != 'success' || needs.typecheck.result != 'success' || needs.test.result != 'success' }} run: | - echo "backend-test: ${{ needs.backend-test.result }}" - echo "frontend-build: ${{ needs.frontend-build.result }}" + echo "lint: ${{ needs.lint.result }}" + echo "typecheck: ${{ needs.typecheck.result }}" + echo "test: ${{ needs.test.result }}" exit 1 - name: 全部通过 run: echo "✅ 所有 CI 检查通过" diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2cdbc5d..0eb1542 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,13 +18,14 @@ permissions: jobs: # ───────────────────────────────────────────────────────────── - # 预检:发布前必须通过 CI + # 预检:版本号校验 + 与 pyproject.toml 一致性检查 # ───────────────────────────────────────────────────────────── pre-check: name: Pre-release Check runs-on: ubuntu-latest outputs: version: ${{ steps.meta.outputs.version }} + tag: ${{ steps.meta.outputs.tag }} steps: - name: Checkout uses: actions/checkout@v4 @@ -65,7 +66,7 @@ jobs: fi # ───────────────────────────────────────────────────────────── - # 构建:后端 wheel(纯 Python,单平台即可)+ 前端 dist + # 构建:wheel + sdist(纯 Python,单平台即可) # ───────────────────────────────────────────────────────────── build: name: Build Artifacts @@ -86,70 +87,26 @@ jobs: with: python-version: '3.13' - - name: 安装 pnpm(前端构建依赖) - uses: pnpm/action-setup@v4 - with: - version: 9 + - name: 安装依赖 + run: uv sync --extra dev --frozen - - name: 设置 Node 22(前端构建) - uses: actions/setup-node@v4 - with: - node-version: 22 - cache: pnpm - cache-dependency-path: frontend/pnpm-lock.yaml - - - name: 安装前端依赖(缓存) - working-directory: frontend - run: pnpm install --frozen-lockfile - - - name: 构建后端 wheel + sdist(自动触发前端构建) + - name: 构建 wheel + sdist run: uv build - - name: 上传后端产物 - uses: actions/upload-artifact@v4 - with: - name: backend-dist - path: dist/* - retention-days: 30 - - build-frontend: - name: Build Frontend - needs: pre-check - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: 安装 pnpm - uses: pnpm/action-setup@v4 - with: - version: 9 - - - name: 设置 Node 22 - uses: actions/setup-node@v4 - with: - node-version: 22 - cache: pnpm - cache-dependency-path: frontend/pnpm-lock.yaml - - - name: 安装依赖 - working-directory: frontend - run: pnpm install --frozen-lockfile - - - name: 构建 - working-directory: frontend - run: pnpm run build - - - name: 打包前端 dist + - name: 校验产物 run: | - cd frontend - zip -r ../endo-frontend-${{ needs.pre-check.outputs.version }}.zip dist + echo "待上传产物:" + ls -la dist/ + if [ -z "$(ls -A dist/*.whl dist/*.tar.gz 2>/dev/null)" ]; then + echo "❌ 未找到 wheel 或 sdist 产物" + exit 1 + fi - - name: 上传前端产物 + - name: 上传构建产物 uses: actions/upload-artifact@v4 with: - name: frontend-dist-release - path: endo-frontend-*.zip + name: dist + path: dist/* retention-days: 30 # ───────────────────────────────────────────────────────────── @@ -161,25 +118,16 @@ jobs: runs-on: ubuntu-latest environment: name: pypi - url: https://pypi.org/project/endo/${{ needs.pre-check.outputs.version }} + url: https://pypi.org/project/pyflowx/${{ needs.pre-check.outputs.version }} permissions: id-token: write steps: - - name: 下载后端构建产物 + - name: 下载构建产物 uses: actions/download-artifact@v4 with: - name: backend-dist + name: dist path: dist - - name: 校验产物 - run: | - echo "待上传产物:" - ls -la dist/ - if [ -z "$(ls -A dist/*.whl dist/*.tar.gz 2>/dev/null)" ]; then - echo "❌ 未找到 wheel 或 sdist 产物" - exit 1 - fi - - name: 上传到 PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: @@ -190,7 +138,7 @@ jobs: # ───────────────────────────────────────────────────────────── release: name: Publish Release - needs: [pre-check, build, build-frontend, publish-pypi] + needs: [pre-check, build, publish-pypi] runs-on: ubuntu-latest permissions: contents: write @@ -198,39 +146,31 @@ jobs: - name: Checkout uses: actions/checkout@v4 - - name: 下载所有构建产物 + - name: 下载构建产物 uses: actions/download-artifact@v4 with: - path: release-assets + name: dist + path: assets - name: 整理发布产物 run: | - mkdir -p assets - find release-assets -name "*.whl" -exec cp {} assets/ \; - find release-assets -name "*.tar.gz" -exec cp {} assets/ \; - find release-assets -name "*.zip" -exec cp {} assets/ \; ls -la assets/ - name: 生成 Release Notes id: notes run: | { - echo "## endo ${{ needs.pre-check.outputs.version }}" + echo "## pyflowx ${{ needs.pre-check.outputs.version }}" echo "" echo "### 下载" echo "" - echo "- **后端 wheel**: \`endo-${{ needs.pre-check.outputs.version }}-py3-none-any.whl\`" - echo "- **源码包**: \`endo-${{ needs.pre-check.outputs.version }}.tar.gz\`" - echo "- **前端 dist**: \`endo-frontend-${{ needs.pre-check.outputs.version }}.zip\`" + echo "- **Wheel**: \`pyflowx-${{ needs.pre-check.outputs.version }}-py3-none-any.whl\`" + echo "- **源码包**: \`pyflowx-${{ needs.pre-check.outputs.version }}.tar.gz\`" echo "" echo "### 安装" echo "" echo '```bash' - echo "# 后端" - echo "pip install endo-${{ needs.pre-check.outputs.version }}-py3-none-any.whl" - echo "" - echo "# 前端" - echo "unzip endo-frontend-${{ needs.pre-check.outputs.version }}.zip -d frontend-dist" + echo "pip install pyflowx==${{ needs.pre-check.outputs.version }}" echo '```' echo "" echo "### 完整变更日志" @@ -245,7 +185,7 @@ jobs: uses: softprops/action-gh-release@v2 with: tag_name: ${{ needs.pre-check.outputs.tag }} - name: endo ${{ needs.pre-check.outputs.version }} + name: pyflowx ${{ needs.pre-check.outputs.version }} body: ${{ steps.notes.outputs.content }} files: assets/* draft: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..e928f5d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,22 @@ +# prek compatible configuration +# See https://pre-commit.com for more information +repos: + - repo: https://gitcode.com/gh_mirrors/ru/ruff-pre-commit.git + # Ruff version - keep in sync with pyproject.toml + rev: v0.15.4 + hooks: + # Run the linter + - id: ruff + args: [ --fix, --exit-non-zero-on-fix ] + # Run the formatter + - id: ruff-format + args: [ --config=pyproject.toml] + - repo: https://gitcode.com/gh_mirrors/pr/pre-commit-hooks.git + rev: v5.0.0 + hooks: + - id: check-merge-conflict + - id: debug-statements + - id: fix-byte-order-marker + - id: trailing-whitespace + args: [ --markdown-linebreak-ext=md ] + - id: end-of-file-fixer diff --git a/README.md b/README.md index e69de29..fd8f7d4 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,241 @@ +# PyFlowX + +> 轻量、类型安全的 DAG 任务调度器。 + +[![CI](https://github.com/pyflowx/pyflowx/actions/workflows/ci.yml/badge.svg)](https://github.com/pyflowx/pyflowx/actions/workflows/ci.yml) +[![PyPI](https://img.shields.io/pypi/v/pyflowx.svg)](https://pypi.org/project/pyflowx/) +[![Python](https://img.shields.io/pypi/pyversions/pyflowx.svg)](https://pypi.org/project/pyflowx/) +[![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen.svg)](https://github.com/pyflowx/pyflowx) +[![License](https://img.shields.io/pypi/l/pyflowx.svg)](https://github.com/pyflowx/pyflowx/blob/main/LICENSE) + +PyFlowX 把"任务依赖"这件事做到极致简单:**参数名就是依赖声明**。无需装饰器、 +无需样板包装器,写一个普通函数,框架按参数名自动注入上游结果。 + +## 特性 + +- **零样板** —— 参数名即依赖,框架自动注入上游结果 +- **三种执行策略** —— `sequential`(调试)/ `thread`(I/O 密集同步)/ `async`(I/O 密集异步) +- **类型安全** —— `TaskSpec[T]` 把返回类型一路传到 `RunReport`,mypy strict 通过 +- **DAG 校验** —— 构建时即时校验重名、缺失依赖、环 +- **自动分层** —— Kahn 算法分组,同层任务可并行 +- **重试与超时** —— 每个任务独立配置 `retries` 与 `timeout` +- **断点续跑** —— `MemoryBackend` / `JSONBackend`,成功结果可缓存复用 +- **可观测** —— `on_event` 回调、`dry_run` 预览、Mermaid 可视化 +- **零运行时依赖** —— 仅依赖标准库(3.8 需 `graphlib_backport`) +- **100% 测试覆盖** —— 分支覆盖率达 100% + +## 安装 + +```bash +pip install pyflowx +``` + +或使用 [uv](https://docs.astral.sh/uv/): + +```bash +uv add pyflowx +``` + +## 快速上手 + +```python +import pyflowx as px + +def extract() -> list[int]: + return [1, 2, 3] + +# 参数名 extract 自动匹配上游任务名 → 自动注入 +def double(extract: list[int]) -> list[int]: + return [x * 2 for x in extract] + +graph = px.Graph.from_specs([ + px.TaskSpec("extract", extract), + px.TaskSpec("double", double, ("extract",)), +]) + +report = px.run(graph, strategy="sequential") +print(report["double"]) # [2, 4, 6] +``` + +## 核心概念 + +### TaskSpec —— 任务描述 + +`TaskSpec` 是不可变的任务描述符,是唯一需要配置的东西: + +```python +px.TaskSpec( + name="fetch_user", # 唯一标识 + fn=fetch_user, # 同步或异步函数 + depends_on=("auth",), # 依赖的任务名 + args=(uid,), # 静态位置参数(追加在注入参数后) + kwargs={"timeout": 30}, # 静态关键字参数 + retries=3, # 失败重试次数(0 = 仅一次) + timeout=30.0, # 超时秒数(None = 不限制) + tags=("api", "user"), # 自由标签,用于子图过滤 +) +``` + +### Graph —— DAG 构建 + +```python +graph = px.Graph.from_specs([...]) # 整批校验(推荐) +# 或增量构建 +graph = px.Graph() +graph.add(px.TaskSpec("a", fn_a)) +graph.add(px.TaskSpec("b", fn_b, ("a",))) + +graph.validate() # 显式校验(环检测) +graph.layers() # 拓扑分层 +graph.to_mermaid() # Mermaid 可视化 +graph.describe() # 人类可读摘要 +graph.subgraph(("api",)) # 按标签切片 +graph.subgraph_by_names(("a", "b")) # 按名称切片 +``` + +### run —— 执行 + +```python +report = px.run( + graph, + strategy="async", # sequential | thread | async + max_workers=8, # thread 策略的线程池大小 + dry_run=False, # True = 仅打印计划 + on_event=callback, # 状态转换回调 + state=px.JSONBackend("state.json"), # 断点续跑后端 +) +``` + +### RunReport —— 结果 + +```python +report["task_name"] # 任务返回值 +report.result_of("task_name") # 完整 TaskResult +report.success # 整体是否成功 +report.summary() # 统计字典 +report.failed_tasks() # 失败任务名列表 +report.describe() # 人类可读报告 +``` + +## 上下文注入规则 + +按顺序求值: + +1. **标注为 `Context`** 的参数 → 接收完整上游结果映射 +2. **名称匹配依赖** 的参数 → 接收该依赖的结果 +3. **`**kwargs`** 参数 → 接收所有依赖结果(dict) +4. **`TaskSpec.args` / `kwargs`** → 为非依赖参数提供静态值 + +```python +from typing import Any, Dict + +def aggregate(ctx: px.Context) -> Dict[str, Any]: + """ctx 包含所有 depends_on 任务的返回值。""" + return dict(ctx) + +def merge(fetch_a: str, fetch_b: str) -> str: + """fetch_a / fetch_b 自动注入。""" + return fetch_a + fetch_b + +def fetch_user(uid: int) -> dict: # uid 来自 TaskSpec.args + ... +``` + +## 执行策略对比 + +| 策略 | 并发模型 | 适用场景 | 同步任务 | 异步任务 | +|------|---------|---------|---------|---------| +| `sequential` | 串行 | 调试、CPU 密集 | 直接调用 | 事件循环 | +| `thread` | 线程池 | I/O 密集同步 | 线程池 | 不支持 | +| `async` | 事件循环 | I/O 密集异步 | 卸载到线程池 | 事件循环 | + +所有策略都遵循 `retries`、`timeout`、上下文注入、状态后端,并发出 `TaskEvent`。 + +## 示例 + +仓库 `examples/` 目录包含完整示例: + +- [`etl_pipeline.py`](examples/etl_pipeline.py) —— ETL 流水线(sequential) +- [`parallel_run.py`](examples/parallel_run.py) —— 并行执行对比(thread vs sequential) +- [`async_aggregation.py`](examples/async_aggregation.py) —— 异步聚合 + Context 注入 + +运行: + +```bash +python examples/etl_pipeline.py +python examples/parallel_run.py +python examples/async_aggregation.py +``` + +## 断点续跑 + +```python +from pyflowx import JSONBackend + +# 第一次运行:成功结果写入 state.json +backend = JSONBackend("state.json") +report = px.run(graph, strategy="sequential", state=backend) + +# 第二次运行:已缓存任务自动跳过 +report = px.run(graph, strategy="sequential", state=backend) +# report.results 中缓存任务状态为 SKIPPED +``` + +## 错误处理 + +所有错误都是 `PyFlowXError` 的子类: + +| 错误 | 触发时机 | +|------|---------| +| `DuplicateTaskError` | 任务名重复注册 | +| `MissingDependencyError` | 依赖了不存在的任务 | +| `CycleError` | 依赖图存在环 | +| `TaskFailedError` | 任务耗尽重试后仍失败 | +| `TaskTimeoutError` | 任务超时 | +| `InjectionError` | 上下文注入无法满足签名 | +| `StorageError` | 状态后端持久化失败 | + +```python +try: + report = px.run(graph, strategy="async") +except px.TaskFailedError as exc: + print(f"{exc.task} 失败: {exc.cause}(尝试 {exc.attempts} 次)") +except px.PyFlowXError: + # 捕获整个错误家族 + raise +``` + +## 与其他库对比 + +| 特性 | PyFlowX | Airflow | Prefect | Dask | +|------|---------|---------|---------|------| +| 零样板 | 参数名即依赖 | 装饰器 + XCom | 装饰器 | 装饰器 | +| 运行时依赖 | 仅标准库 | 重型 | 中型 | 中型 | +| 类型安全 | mypy strict | 弱 | 中 | 中 | +| 异步原生 | 是 | 否 | 部分 | 否 | +| 断点续跑 | 内置 | 需配置 | 需配置 | 需配置 | +| 学习曲线 | 极低 | 高 | 中 | 中 | +| 适用规模 | 单机 | 集群 | 单机/集群 | 集群 | + +PyFlowX 专注于**单机 DAG 调度**的极致简洁,适合 ETL、数据处理、CI 流水线等场景。 + +## 开发 + +```bash +# 安装开发依赖 +uv sync --extra dev + +# 运行测试(含覆盖率) +uv run pytest --cov=pyflowx --cov-fail-under=100 + +# 类型检查 +uv run mypy + +# 代码风格 +uv run ruff check src tests examples +uv run ruff format --check src tests examples +``` + +## 许可证 + +MIT diff --git a/pyproject.toml b/pyproject.toml index fac2794..cfdd930 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,7 @@ dev = ["pyflowx[dev]"] [tool.coverage.run] branch = true -concurrency = ["greenlet", "thread"] +concurrency = ["thread"] source = ["pyflowx"] [tool.coverage.report] diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 3cc6fd9..be33c4b 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -1,16 +1,15 @@ -"""Executors and the public :func:`run` entry point. +"""执行器与公共 :func:`run` 入口。 -Three execution strategies share a common layer-by-layer driver: +三种执行策略共享一个逐层驱动器: -* ``sequential`` — deterministic, one task at a time. Best for debugging. -* ``thread`` — layer-internal concurrency via a thread pool. Best for - I/O-bound sync tasks. -* ``async`` — layer-internal concurrency via ``asyncio.gather``. - Sync tasks are offloaded to a thread pool; async tasks - run on the event loop. Best for I/O-bound async tasks. +* ``sequential`` —— 确定性、一次一个任务。最适合调试。 +* ``thread`` —— 通过线程池实现层内并发。最适合 I/O 密集型同步任务。 +* ``async`` —— 通过 ``asyncio.gather`` 实现层内并发。同步任务被 + 卸载到线程池;异步任务运行在事件循环上。最适合 + I/O 密集型异步任务。 -All three honour ``retries``, ``timeout``, context injection, state -backends (resume), and emit :class:`~pyflowx.task.TaskEvent` for observers. +三者都遵循 ``retries``、``timeout``、上下文注入、状态后端(续跑), +并向观察者发出 :class:`~pyflowx.task.TaskEvent`。 """ from __future__ import annotations @@ -31,15 +30,15 @@ from .task import TaskEvent, TaskResult, TaskSpec, TaskStatus logger = logging.getLogger("pyflowx") -# Observer callback type. +# 观察者回调类型。 EventCallback = Callable[[TaskEvent], None] -# Strategy selector literal. +# 策略选择字面量。 Strategy = str # "sequential" | "thread" | "async" def _is_async_fn(spec: TaskSpec[object]) -> bool: - """True if ``spec.fn`` is a coroutine function.""" + """判断 ``spec.fn`` 是否为协程函数。""" return inspect.iscoroutinefunction(spec.fn) @@ -47,7 +46,7 @@ def _emit( on_event: Optional[EventCallback], result: TaskResult[object], ) -> None: - """Fire an observer event if a callback is registered.""" + """若注册了回调则触发一个观察者事件。""" if on_event is None: return on_event( @@ -91,7 +90,7 @@ def _run_sync_with_retry( context: Mapping[str, Any], layer_idx: Optional[int], ) -> TaskResult[object]: - """Execute a sync task with retries; return a populated TaskResult.""" + """执行同步任务并带重试;返回填充好的 TaskResult。""" result: TaskResult[object] = TaskResult(spec=spec) result.started_at = datetime.now() max_attempts = spec.retries + 1 @@ -104,7 +103,7 @@ def _run_sync_with_retry( result.status = TaskStatus.SUCCESS result.finished_at = datetime.now() return result - except Exception as exc: # noqa: BLE001 - user code may raise anything + except Exception as exc: # noqa: BLE001 - 用户代码可能抛任何异常 result.error = exc if result.attempts >= max_attempts: _finalize_failure(result, layer_idx) # pragma: no cover @@ -117,7 +116,7 @@ async def _run_async_with_retry( context: Mapping[str, Any], layer_idx: Optional[int], ) -> TaskResult[object]: - """Execute a task (sync or async) on the event loop with retries.""" + """在事件循环上执行任务(同步或异步)并带重试。""" result: TaskResult[object] = TaskResult(spec=spec) result.started_at = datetime.now() max_attempts = spec.retries + 1 @@ -134,8 +133,10 @@ async def _run_async_with_retry( else: result.value = await coro else: - # Offload sync work to a thread so the event loop stays alive. - fn_call: Callable[[], Any] = lambda: spec.fn(*args, **kwargs) + # 将同步工作卸载到线程,保持事件循环存活。 + def fn_call() -> Any: + return spec.fn(*args, **kwargs) + if spec.timeout is not None: result.value = await asyncio.wait_for( loop.run_in_executor(None, fn_call), timeout=spec.timeout @@ -164,13 +165,13 @@ async def _run_async_with_retry( # ---------------------------------------------------------------------- # -# Layer driver +# 层驱动器 # ---------------------------------------------------------------------- # def _build_context( spec: TaskSpec[object], global_context: Mapping[str, Any], ) -> Mapping[str, Any]: - """Restrict the global context to this task's dependencies.""" + """将全局上下文限制为本任务的依赖。""" return { dep: global_context[dep] for dep in spec.depends_on if dep in global_context } @@ -185,7 +186,7 @@ def _execute_layer_sequential( layer_idx: int, on_event: Optional[EventCallback], ) -> None: - """Run a layer's tasks one by one.""" + """逐个运行某层的任务。""" for name in layer: spec = graph.spec(name) if backend.has(name): @@ -213,8 +214,8 @@ def _execute_layer_threaded( on_event: Optional[EventCallback], max_workers: int, ) -> None: - """Run a layer's tasks concurrently in a thread pool.""" - # First, satisfy cached tasks synchronously. + """在线程池中并发运行某层的任务。""" + # 先同步满足已缓存任务。 to_run: List[str] = [] for name in layer: if backend.has(name): @@ -235,14 +236,14 @@ def _execute_layer_threaded( future_to_name: Dict[concurrent.futures.Future[TaskResult[object]], str] = {} for name in to_run: spec = graph.spec(name) - # Snapshot the context for this task to avoid races. + # 为本任务快照上下文以避免竞态。 task_ctx = _build_context(spec, context) fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx) future_to_name[fut] = name for fut in concurrent.futures.as_completed(future_to_name): name = future_to_name[fut] - result = fut.result() # raises TaskFailedError on failure + result = fut.result() # 失败时抛出 TaskFailedError context[name] = result.value backend.save(name, result.value) report.results[name] = result @@ -258,7 +259,7 @@ async def _execute_layer_async( layer_idx: int, on_event: Optional[EventCallback], ) -> None: - """Run a layer's tasks concurrently on the event loop.""" + """在事件循环上并发运行某层的任务。""" to_run: List[str] = [] for name in layer: if backend.has(name): @@ -290,7 +291,7 @@ async def _execute_layer_async( # ---------------------------------------------------------------------- # -# Public API +# 公共 API # ---------------------------------------------------------------------- # def run( graph: Graph, @@ -301,32 +302,32 @@ def run( on_event: Optional[EventCallback] = None, state: Optional[StateBackend] = None, ) -> RunReport: - """Execute a graph and return a :class:`RunReport`. + """执行图并返回 :class:`RunReport`。 - Parameters - ---------- + 参数 + ---- graph: - The validated :class:`Graph` to execute. + 待执行的已校验 :class:`Graph`。 strategy: - ``"sequential"`` (default), ``"thread"``, or ``"async"``. + ``"sequential"``(默认)、``"thread"`` 或 ``"async"``。 max_workers: - Thread-pool size for ``"thread"``. Defaults to ``min(32, len(layer))``. + ``"thread"`` 的线程池大小。默认 ``min(32, len(layer))``。 dry_run: - If ``True``, print the execution plan (layers + injection) and - return an empty report without executing anything. + 若为 ``True``,打印执行计划(层 + 注入)并返回空报告,不执行 + 任何任务。 on_event: - Optional callback invoked on every status transition. + 可选回调,在每次状态转换时调用。 state: - Optional :class:`StateBackend` for resumable runs. Defaults to an - in-memory backend (no persistence across processes). + 可选 :class:`StateBackend`,用于断点续跑。默认为内存后端 + (不跨进程持久化)。 - Raises - ------ + 抛出 + ---- ValueError - If ``strategy`` is not recognised. + ``strategy`` 不被识别时。 TaskFailedError - If any task fails after exhausting retries. The run aborts at the - failing layer; tasks in later layers are not attempted. + 任何任务耗尽重试后仍失败时。运行在失败层中止;后续层的任务 + 不会被执行。 """ if strategy not in ("sequential", "thread", "async"): raise ValueError( @@ -361,7 +362,7 @@ def run( def _print_dry_run(graph: Graph, layers: List[List[str]]) -> None: - """Print the execution plan without running anything.""" + """打印执行计划但不运行任何任务。""" print(f"Dry run: {len(graph)} tasks, {len(layers)} layers") for idx, layer in enumerate(layers, 1): print(f" Layer {idx}: {layer}") diff --git a/src/pyflowx/graph.py b/src/pyflowx/graph.py index bd91d91..d072f08 100644 --- a/src/pyflowx/graph.py +++ b/src/pyflowx/graph.py @@ -14,14 +14,14 @@ from .errors import CycleError, DuplicateTaskError, MissingDependencyError from .task import TaskSpec # graphlib 自 3.9 起进入标准库;3.8 回退到 backport。 -if sys.version_info >= (3, 9): +if sys.version_info >= (3, 9): # pragma: no cover import graphlib _TopologicalSorter = graphlib.TopologicalSorter -else: # pragma: no cover - 仅在 3.8 上执行 - import graphlib # type: ignore[no-redef] +else: # pragma: no cover + import graphlib # type: ignore[import-untyped] # pragma: no cover - _TopologicalSorter = graphlib.TopologicalSorter + _TopologicalSorter = graphlib.TopologicalSorter # pragma: no cover class Graph: diff --git a/tests/test_errors.py b/tests/test_errors.py index d684d41..433a4f8 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -2,9 +2,6 @@ from __future__ import annotations -import pytest - -import pyflowx as px from pyflowx.errors import ( CycleError, DuplicateTaskError, diff --git a/tests/test_task.py b/tests/test_task.py index 5a38878..8c70549 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -6,7 +6,6 @@ from datetime import datetime import pytest -import pyflowx as px from pyflowx.task import TaskResult, TaskSpec, TaskStatus @@ -51,9 +50,7 @@ def test_task_result_duration_computed() -> None: spec: TaskSpec[None] = TaskSpec("a", _fn) start = datetime(2024, 1, 1, 0, 0, 0) end = datetime(2024, 1, 1, 0, 0, 5) - result: TaskResult[None] = TaskResult( - spec=spec, started_at=start, finished_at=end - ) + result: TaskResult[None] = TaskResult(spec=spec, started_at=start, finished_at=end) assert result.duration == 5.0