Files
pyflowx/examples/parallel_run.py
T
zhou 8b7777d936 feat: 初始化PyFlowX轻量级DAG任务调度库
实现完整的DAG任务调度核心功能,包括:
1.  支持同步/异步/线程三种执行策略
2.  自动上下文注入,无需手动绑定任务依赖
3.  内置状态后端,支持断点续跑
4.  提供完整的测试用例与示例代码
5.  添加CI/CD配置与发布流程
2026-06-20 10:41:33 +08:00

60 lines
1.4 KiB
Python

"""Example 2: parallel execution (thread strategy).
Same DAG run with sequential vs. thread strategy to show layer-internal
parallelism. Tasks within a layer run concurrently; layers are barriers.
Layer 1: [fetch_a, fetch_b] (parallel)
Layer 2: [merge] (waits for both)
"""
from __future__ import annotations
import time
import pyflowx as px
def fetch_a() -> str:
time.sleep(0.5)
return "a"
def fetch_b() -> str:
time.sleep(0.5)
return "b"
def merge(fetch_a: str, fetch_b: str) -> str:
return fetch_a + fetch_b
def main() -> None:
graph = px.Graph.from_specs(
[
px.TaskSpec("fetch_a", fetch_a),
px.TaskSpec("fetch_b", fetch_b),
px.TaskSpec("merge", merge, ("fetch_a", "fetch_b")),
]
)
print("=== Mermaid diagram ===")
print(graph.to_mermaid("LR"))
print("\n=== Sequential (expect ~1.0s) ===")
start = time.time()
report_seq = px.run(graph, strategy="sequential")
t_seq = time.time() - start
print(f" result={report_seq['merge']} time={t_seq:.2f}s")
print("\n=== Threaded (expect ~0.5s) ===")
start = time.time()
report_thr = px.run(graph, strategy="thread", max_workers=2)
t_thr = time.time() - start
print(f" result={report_thr['merge']} time={t_thr:.2f}s")
print(f"\nspeedup = {t_seq / t_thr:.2f}x")
if __name__ == "__main__":
main()