chore: 整理代码格式与冗余内容

This commit is contained in:
2026-06-21 15:14:07 +08:00
parent 5ddfe8510c
commit 939cd724ec
3 changed files with 7 additions and 15 deletions
+2 -4
View File
@@ -27,7 +27,7 @@ class MissingDependencyError(PyFlowXError):
def __init__(self, task: str, dependency: str) -> None: def __init__(self, task: str, dependency: str) -> None:
super().__init__( super().__init__(
f"Task '{task}' depends on unknown task '{dependency}'. " f"Task '{task}' depends on unknown task '{dependency}'. "
"Add the dependency before (or together with) this task." + "Add the dependency before (or together with) this task."
) )
self.task = task self.task = task
self.dependency = dependency self.dependency = dependency
@@ -58,9 +58,7 @@ class TaskFailedError(PyFlowXError):
layer: int | None = None, layer: int | None = None,
) -> None: ) -> None:
location = f" (layer {layer})" if layer is not None else "" location = f" (layer {layer})" if layer is not None else ""
super().__init__( super().__init__(f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}")
f"Task '{task}' failed after {attempts} attempt(s){location}: {cause}"
)
self.task = task self.task = task
self.cause = cause self.cause = cause
self.attempts = attempts self.attempts = attempts
+3 -9
View File
@@ -35,11 +35,7 @@ def transform(
extract_orders: list[dict], extract_orders: list[dict],
) -> list[dict]: ) -> list[dict]:
cmap = {c["id"]: c for c in extract_customers} cmap = {c["id"]: c for c in extract_customers}
return [ return [{**o, "customer_name": cmap[o["customer_id"]]["name"]} for o in extract_orders if o["customer_id"] in cmap]
{**o, "customer_name": cmap[o["customer_id"]]["name"]}
for o in extract_orders
if o["customer_id"] in cmap
]
def load(transform: list[dict]) -> int: def load(transform: list[dict]) -> int:
@@ -58,9 +54,7 @@ def main() -> None:
depends_on=("extract_customers", "extract_orders"), depends_on=("extract_customers", "extract_orders"),
tags=("transform",), tags=("transform",),
), ),
px.TaskSpec( px.TaskSpec("load", load, depends_on=("transform",), retries=1, tags=("load",)),
"load", load, depends_on=("transform",), retries=1, tags=("load",)
),
] ]
) )
@@ -68,7 +62,7 @@ def main() -> None:
print(graph.describe()) print(graph.describe())
print("\n=== Dry run (no execution) ===") print("\n=== Dry run (no execution) ===")
px.run(graph, strategy="sequential", dry_run=True) _ = px.run(graph, strategy="sequential", dry_run=True)
print("\n=== Sequential execution ===") print("\n=== Sequential execution ===")
report = px.run(graph, strategy="sequential") report = px.run(graph, strategy="sequential")
+2 -2
View File
@@ -151,7 +151,7 @@ class TaskSpec(Generic[T]):
if self.fn is not None: if self.fn is not None:
return self.fn return self.fn
raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。") raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。") # pragma: no cover
def _wrap_cmd(self) -> TaskFn[Any]: def _wrap_cmd(self) -> TaskFn[Any]:
"""将 cmd 包装为可执行函数. """将 cmd 包装为可执行函数.
@@ -249,7 +249,7 @@ class TaskSpec(Generic[T]):
if callable(cmd): if callable(cmd):
return cmd # type: ignore[return-value] return cmd # type: ignore[return-value]
raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") raise TypeError(f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}") # pragma: no cover
def should_execute(self) -> bool: def should_execute(self) -> bool:
"""检查任务是否应该执行. """检查任务是否应该执行.