Files
pyflowx/src/pyflowx/examples/async_aggregation.py
T
zhou fd282db28f refactor: 整理代码格式与项目结构,修复命令检查bug
1. 重构多处列表展开写法,统一代码格式风格
2. 修复executors.py中命令不存在时的类型判断bug
3. 删除废弃的envlinux.py并替换为envdev.py,更新CLI入口配置
4. 为storage.py的后端方法添加override装饰器
5. 移除空的cli/__init__.py冗余导入
6. 更新pyproject.toml依赖与配置项
7. 精简测试用例代码
2026-06-26 21:45:06 +08:00

57 lines
1.5 KiB
Python

"""Example 3: async aggregation with static args and Context injection.
Shows:
* async task functions executed with strategy="async".
* static positional args (TaskSpec.args) for parameterised tasks.
* Context annotation to receive the full upstream result mapping.
* on_event callback for real-time progress.
"""
from __future__ import annotations
import asyncio
from typing import Any
import pyflowx as px
async def fetch_user(uid: int) -> dict[str, Any]:
await asyncio.sleep(0.2)
return {"id": uid, "name": f"User{uid}"}
async def fetch_posts(uid: int) -> list[int]:
await asyncio.sleep(0.2)
return [uid, uid + 1]
# Context annotation → receives the full mapping of upstream results.
def aggregate(ctx: px.Context) -> dict[str, Any]:
return dict(ctx)
def main() -> None:
graph = px.Graph.from_specs([
# Static positional args parameterise the same function twice.
px.TaskSpec("fetch_user", fetch_user, args=(1,)),
px.TaskSpec("fetch_posts", fetch_posts, args=(1,)),
px.TaskSpec("aggregate", aggregate, depends_on=("fetch_user", "fetch_posts")),
])
print("=== Dry run ===")
_ = px.run(graph, strategy="async", dry_run=True)
events: list[px.TaskEvent] = []
print("\n=== Async execution ===")
report = px.run(graph, strategy="async", on_event=events.append)
for ev in events:
print(f" event: {ev.task} -> {ev.status.value}")
print(f"\naggregate = {report['aggregate']}")
print(report.describe())
if __name__ == "__main__":
main()