From 939cd724eca84fe4ac722686a18ce318db543d6c Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 21 Jun 2026 15:14:07 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E4=B8=8E=E5=86=97=E4=BD=99=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pyflowx/errors.py | 6 ++---- src/pyflowx/examples/etl_pipeline.py | 12 +++--------- src/pyflowx/task.py | 4 ++-- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/src/pyflowx/errors.py b/src/pyflowx/errors.py index 7b94272..c010d58 100644 --- a/src/pyflowx/errors.py +++ b/src/pyflowx/errors.py @@ -27,7 +27,7 @@ class MissingDependencyError(PyFlowXError): def __init__(self, task: str, dependency: str) -> None: super().__init__( f"Task '{task}' depends on unknown task '{dependency}'. " - "Add the dependency before (or together with) this task." + + "Add the dependency before (or together with) this task." ) self.task = task self.dependency = dependency @@ -58,9 +58,7 @@ class TaskFailedError(PyFlowXError): layer: int | None = None, ) -> None: location = f" (layer {layer})" if layer is not None else "" - super().__init__( - f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}" - ) + super().__init__(f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}") self.task = task self.cause = cause self.attempts = attempts diff --git a/src/pyflowx/examples/etl_pipeline.py b/src/pyflowx/examples/etl_pipeline.py index 5e5733f..afa87a8 100644 --- a/src/pyflowx/examples/etl_pipeline.py +++ b/src/pyflowx/examples/etl_pipeline.py @@ -35,11 +35,7 @@ def transform( extract_orders: list[dict], ) -> list[dict]: cmap = {c["id"]: c for c in extract_customers} - return [ - {**o, "customer_name": cmap[o["customer_id"]]["name"]} - for o in extract_orders - if o["customer_id"] in cmap - ] + return [{**o, "customer_name": cmap[o["customer_id"]]["name"]} for o in extract_orders if o["customer_id"] in cmap] def load(transform: list[dict]) -> int: @@ -58,9 +54,7 @@ def main() -> None: depends_on=("extract_customers", "extract_orders"), tags=("transform",), ), - px.TaskSpec( - "load", load, depends_on=("transform",), retries=1, tags=("load",) - ), + px.TaskSpec("load", load, depends_on=("transform",), retries=1, tags=("load",)), ] ) @@ -68,7 +62,7 @@ def main() -> None: print(graph.describe()) print("\n=== Dry run (no execution) ===") - px.run(graph, strategy="sequential", dry_run=True) + _ = px.run(graph, strategy="sequential", dry_run=True) print("\n=== Sequential execution ===") report = px.run(graph, strategy="sequential") diff --git a/src/pyflowx/task.py b/src/pyflowx/task.py index 24a2adb..93a6982 100644 --- a/src/pyflowx/task.py +++ b/src/pyflowx/task.py @@ -151,7 +151,7 @@ class TaskSpec(Generic[T]): if self.fn is not None: return self.fn - raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。") + raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。") # pragma: no cover def _wrap_cmd(self) -> TaskFn[Any]: """将 cmd 包装为可执行函数. @@ -249,7 +249,7 @@ class TaskSpec(Generic[T]): if callable(cmd): return cmd # type: ignore[return-value] - raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") + raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") # pragma: no cover def should_execute(self) -> bool: """检查任务是否应该执行.