chore: release v0.1.1 and add example demos
1. 新增3个官方示例代码:ETL流水线、并行执行、异步聚合 2. 添加__main__.py入口和示例包导出 3. 补充项目依赖声明和控制台脚本配置 4. 更新uv.lock和包版本号至0.1.1
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
from pyflowx.examples.async_aggregation import main as async_aggregation_main
|
||||
from pyflowx.examples.etl_pipeline import main as etl_pipeline_main
|
||||
from pyflowx.examples.parallel_run import main as parallel_run_main
|
||||
|
||||
|
||||
def main():
|
||||
async_aggregation_main()
|
||||
etl_pipeline_main()
|
||||
parallel_run_main()
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Example 3: async aggregation with static args and Context injection.
|
||||
|
||||
Shows:
|
||||
* async task functions executed with strategy="async".
|
||||
* static positional args (TaskSpec.args) for parameterised tasks.
|
||||
* Context annotation to receive the full upstream result mapping.
|
||||
* on_event callback for real-time progress.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pyflowx as px
|
||||
|
||||
|
||||
async def fetch_user(uid: int) -> dict:
|
||||
await asyncio.sleep(0.2)
|
||||
return {"id": uid, "name": f"User{uid}"}
|
||||
|
||||
|
||||
async def fetch_posts(uid: int) -> List[int]:
|
||||
await asyncio.sleep(0.2)
|
||||
return [uid, uid + 1]
|
||||
|
||||
|
||||
# Context annotation → receives the full mapping of upstream results.
|
||||
def aggregate(ctx: px.Context) -> Dict[str, Any]:
|
||||
return dict(ctx)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
graph = px.Graph.from_specs(
|
||||
[
|
||||
# Static positional args parameterise the same function twice.
|
||||
px.TaskSpec("fetch_user", fetch_user, args=(1,)),
|
||||
px.TaskSpec("fetch_posts", fetch_posts, args=(1,)),
|
||||
px.TaskSpec("aggregate", aggregate, ("fetch_user", "fetch_posts")),
|
||||
]
|
||||
)
|
||||
|
||||
print("=== Dry run ===")
|
||||
px.run(graph, strategy="async", dry_run=True)
|
||||
|
||||
events: List[px.TaskEvent] = []
|
||||
print("\n=== Async execution ===")
|
||||
report = px.run(graph, strategy="async", on_event=events.append)
|
||||
|
||||
for ev in events:
|
||||
print(f" event: {ev.task} -> {ev.status.value}")
|
||||
|
||||
print(f"\naggregate = {report['aggregate']}")
|
||||
print(report.describe())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,81 @@
|
||||
"""Example 1: ETL pipeline (sequential strategy).
|
||||
|
||||
Demonstrates the core PyFlowX workflow:
|
||||
* Define tasks as plain functions.
|
||||
* Declare the DAG with a list of TaskSpec.
|
||||
* Parameter names == dependency names → automatic context injection,
|
||||
no wrappers needed (contrast with flowweaver's get_task_result boilerplate).
|
||||
* dry_run to preview, then execute and read typed results from RunReport.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
import pyflowx as px
|
||||
|
||||
# --- task functions: pure, testable, no framework coupling ------------- #
|
||||
|
||||
|
||||
def extract_customers() -> List[dict]:
|
||||
return [
|
||||
{"id": "C001", "name": "Alice"},
|
||||
{"id": "C002", "name": "Bob"},
|
||||
]
|
||||
|
||||
|
||||
def extract_orders() -> List[dict]:
|
||||
return [
|
||||
{"id": "O001", "customer_id": "C001", "amount": 150.0},
|
||||
{"id": "O002", "customer_id": "C002", "amount": 200.5},
|
||||
]
|
||||
|
||||
|
||||
# Parameter names match dependency names → automatic injection.
|
||||
def transform(
|
||||
extract_customers: List[dict],
|
||||
extract_orders: List[dict],
|
||||
) -> List[dict]:
|
||||
cmap = {c["id"]: c for c in extract_customers}
|
||||
return [
|
||||
{**o, "customer_name": cmap[o["customer_id"]]["name"]}
|
||||
for o in extract_orders
|
||||
if o["customer_id"] in cmap
|
||||
]
|
||||
|
||||
|
||||
def load(transform: List[dict]) -> int:
|
||||
print(f" loaded {len(transform)} records")
|
||||
return len(transform)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
graph = px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("extract_customers", extract_customers, tags=("extract",)),
|
||||
px.TaskSpec("extract_orders", extract_orders, tags=("extract",)),
|
||||
px.TaskSpec(
|
||||
"transform",
|
||||
transform,
|
||||
("extract_customers", "extract_orders"),
|
||||
tags=("transform",),
|
||||
),
|
||||
px.TaskSpec("load", load, ("transform",), retries=1, tags=("load",)),
|
||||
]
|
||||
)
|
||||
|
||||
print("=== Execution plan ===")
|
||||
print(graph.describe())
|
||||
|
||||
print("\n=== Dry run (no execution) ===")
|
||||
px.run(graph, strategy="sequential", dry_run=True)
|
||||
|
||||
print("\n=== Sequential execution ===")
|
||||
report = px.run(graph, strategy="sequential")
|
||||
print(report.describe())
|
||||
print(f"\nload result = {report['load']}")
|
||||
print(f"summary = {report.summary()}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Example 2: parallel execution (thread strategy).
|
||||
|
||||
Same DAG run with sequential vs. thread strategy to show layer-internal
|
||||
parallelism. Tasks within a layer run concurrently; layers are barriers.
|
||||
|
||||
Layer 1: [fetch_a, fetch_b] (parallel)
|
||||
Layer 2: [merge] (waits for both)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pyflowx as px
|
||||
|
||||
|
||||
def fetch_a() -> str:
|
||||
time.sleep(0.5)
|
||||
return "a"
|
||||
|
||||
|
||||
def fetch_b() -> str:
|
||||
time.sleep(0.5)
|
||||
return "b"
|
||||
|
||||
|
||||
def merge(fetch_a: str, fetch_b: str) -> str:
|
||||
return fetch_a + fetch_b
|
||||
|
||||
|
||||
def main() -> None:
|
||||
graph = px.Graph.from_specs(
|
||||
[
|
||||
px.TaskSpec("fetch_a", fetch_a),
|
||||
px.TaskSpec("fetch_b", fetch_b),
|
||||
px.TaskSpec("merge", merge, ("fetch_a", "fetch_b")),
|
||||
]
|
||||
)
|
||||
|
||||
print("=== Mermaid diagram ===")
|
||||
print(graph.to_mermaid("LR"))
|
||||
|
||||
print("\n=== Sequential (expect ~1.0s) ===")
|
||||
start = time.time()
|
||||
report_seq = px.run(graph, strategy="sequential")
|
||||
t_seq = time.time() - start
|
||||
print(f" result={report_seq['merge']} time={t_seq:.2f}s")
|
||||
|
||||
print("\n=== Threaded (expect ~0.5s) ===")
|
||||
start = time.time()
|
||||
report_thr = px.run(graph, strategy="thread", max_workers=2)
|
||||
t_thr = time.time() - start
|
||||
print(f" result={report_thr['merge']} time={t_thr:.2f}s")
|
||||
|
||||
print(f"\nspeedup = {t_seq / t_thr:.2f}x")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user