From 983d47bd2e67937870d00356330a7abca8d7496a Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 21 Jun 2026 21:55:18 +0800 Subject: [PATCH] =?UTF-8?q?refactor(executors):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B7=B3=E8=BF=87=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E5=8F=96=E5=85=AC=E5=85=B1=E5=87=BD=E6=95=B0=E5=B9=B6?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 提取上游任务跳过检查和条件检查为公共工具函数 2. 重构同步和异步执行器的跳过判断逻辑,减少代码重复 3. 格式化gittool.py和测试文件的列表语法,提升可读性 --- src/pyflowx/cli/gittool.py | 60 +++++++------ src/pyflowx/executors.py | 170 +++++++++++++++++++++++-------------- tests/test_executors.py | 160 ++++++++++++++++++++-------------- 3 files changed, 235 insertions(+), 155 deletions(-) diff --git a/src/pyflowx/cli/gittool.py b/src/pyflowx/cli/gittool.py index 5b76b99..eb22a6b 100644 --- a/src/pyflowx/cli/gittool.py +++ b/src/pyflowx/cli/gittool.py @@ -31,16 +31,20 @@ def init_sub_dirs() -> None: sub_dirs = [subdir for subdir in Path.cwd().iterdir() if subdir.is_dir()] for subdir in sub_dirs: px.run( - px.Graph.from_specs([ - px.TaskSpec( - "init", - cmd=["git", "init"], - conditions=[not_has_git_repo], - cwd=str(subdir), - ), - px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], cwd=str(subdir)), - px.TaskSpec("commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], cwd=str(subdir)), - ]), + px.Graph.from_specs( + [ + px.TaskSpec( + "init", + cmd=["git", "init"], + conditions=[not_has_git_repo], + cwd=str(subdir), + ), + px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], cwd=str(subdir)), + px.TaskSpec( + "commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], cwd=str(subdir) + ), + ] + ), ) @@ -67,23 +71,29 @@ def main() -> None: description="Gittool - Git 执行工具.", graphs={ # 添加并提交 - "a": px.Graph.from_specs([ - px.TaskSpec("add", cmd=["git", "add", "."], conditions=[has_files]), - px.TaskSpec("commit", cmd=["git", "commit", "-m", "chore: update"], depends_on=["add"]), - ]), + "a": px.Graph.from_specs( + [ + px.TaskSpec("add", cmd=["git", "add", "."], conditions=[has_files]), + px.TaskSpec("commit", cmd=["git", "commit", "-m", "chore: update"], depends_on=["add"]), + ] + ), # 清理 - "c": px.Graph.from_specs([ - px.TaskSpec("clean", cmd=["git", "clean", "-xfd", *EXCLUDE_CMDS]), - px.TaskSpec("status", cmd=["git", "status", "--porcelain"], depends_on=["clean"]), - ]), + "c": px.Graph.from_specs( + [ + px.TaskSpec("clean", cmd=["git", "clean", "-xfd", *EXCLUDE_CMDS]), + px.TaskSpec("status", cmd=["git", "status", "--porcelain"], depends_on=["clean"]), + ] + ), # 初始化、添加并提交 - "i": px.Graph.from_specs([ - px.TaskSpec("init", cmd=["git", "init"], conditions=[not_has_git_repo]), - px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], conditions=[has_files]), - px.TaskSpec( - "commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], conditions=[has_files] - ), - ]), + "i": px.Graph.from_specs( + [ + px.TaskSpec("init", cmd=["git", "init"], conditions=[not_has_git_repo]), + px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], conditions=[has_files]), + px.TaskSpec( + "commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], conditions=[has_files] + ), + ] + ), # 初始化子目录 "isub": px.Graph.from_specs([isub]), # 推送 diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index d918371..9c28bba 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -87,6 +87,56 @@ def _finalize_failure( ) +def _check_upstream_skipped( + spec: TaskSpec[Any], + report: RunReport | None, +) -> tuple[bool, str | None]: + """检查上游任务是否被 SKIPPED。 + + Returns + ------- + tuple[bool, str | None] + (是否应该跳过, 跳过原因) + """ + if report is None: + return False, None + + for dep in spec.depends_on: + if dep in report.results and report.results[dep].status == TaskStatus.SKIPPED: + return True, f"上游任务 '{dep}' 被跳过" + return False, None + + +def _check_conditions_for_skip( + spec: TaskSpec[Any], +) -> str | None: + """检查任务条件是否满足,返回跳过原因(如果不满足)。 + + Returns + ------- + str | None + 跳过原因,如果条件满足则返回 None + """ + if spec.should_execute(): + return None + + # 检查是哪个条件不满足 + failed_conditions = [] + for condition in spec.conditions: + try: + if not condition(): + failed_conditions.append(condition.__name__ or "匿名条件") + except Exception: + failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") + + if failed_conditions: + return f"条件不满足: {', '.join(failed_conditions)}" + elif spec.skip_if_missing and not spec._is_cmd_available(): + return f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}" + else: + return "条件不满足" + + def _run_sync_with_retry( spec: TaskSpec[Any], context: Mapping[str, Any], @@ -98,33 +148,20 @@ def _run_sync_with_retry( result: TaskResult[Any] = TaskResult(spec=spec) # 检查上游任务是否被 SKIPPED - if report is not None: - for dep in spec.depends_on: - if dep in report.results and report.results[dep].status == TaskStatus.SKIPPED: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = f"上游任务 '{dep}' 被跳过" - logger.info("task %r skipped (上游任务 %r 被跳过)", spec.name, dep) - return result - - # 检查条件是否满足 - if not spec.should_execute(): + should_skip, skip_reason = _check_upstream_skipped(spec, report) + if should_skip: result.status = TaskStatus.SKIPPED result.finished_at = datetime.now() - # 检查是哪个条件不满足 - failed_conditions = [] - for condition in spec.conditions: - try: - if not condition(): - failed_conditions.append(condition.__name__ or "匿名条件") - except Exception: - failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") - if failed_conditions: - result.reason = f"条件不满足: {', '.join(failed_conditions)}" - elif spec.skip_if_missing and not spec._is_cmd_available(): - result.reason = f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}" - else: - result.reason = "条件不满足" + result.reason = skip_reason + logger.info("task %r skipped (上游任务被跳过)", spec.name) + return result + + # 检查条件是否满足 + skip_reason = _check_conditions_for_skip(spec) + if skip_reason is not None: + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + result.reason = skip_reason logger.info("task %r skipped (条件不满足)", spec.name) return result @@ -147,6 +184,36 @@ def _run_sync_with_retry( raise AssertionError("unreachable") # pragma: no cover +async def _execute_async_task( + spec: TaskSpec[Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], + loop: asyncio.AbstractEventLoop, +) -> Any: + """执行异步或同步任务(带超时处理)。 + + Returns + ------- + Any + 任务返回值 + """ + if _is_async_fn(spec): + coro = cast(Awaitable[Any], spec.effective_fn(*args, **kwargs)) + if spec.timeout is not None: + return await asyncio.wait_for(coro, timeout=spec.timeout) + else: + return await coro + else: + # 将同步工作卸载到线程,保持事件循环存活。 + def fn_call() -> Any: + return spec.effective_fn(*args, **kwargs) + + if spec.timeout is not None: + return await asyncio.wait_for(loop.run_in_executor(None, fn_call), timeout=spec.timeout) + else: + return await loop.run_in_executor(None, fn_call) + + async def _run_async_with_retry( spec: TaskSpec[Any], context: Mapping[str, Any], @@ -158,33 +225,20 @@ async def _run_async_with_retry( result: TaskResult[Any] = TaskResult[Any](spec=spec) # 检查上游任务是否被 SKIPPED - if report is not None: - for dep in spec.depends_on: - if dep in report.results and report.results[dep].status == TaskStatus.SKIPPED: - result.status = TaskStatus.SKIPPED - result.finished_at = datetime.now() - result.reason = f"上游任务 '{dep}' 被跳过" - logger.info("task %r skipped (上游任务 %r 被跳过)", spec.name, dep) - return result - - # 检查条件是否满足 - if not spec.should_execute(): + should_skip, skip_reason = _check_upstream_skipped(spec, report) + if should_skip: result.status = TaskStatus.SKIPPED result.finished_at = datetime.now() - # 检查是哪个条件不满足 - failed_conditions = [] - for condition in spec.conditions: - try: - if not condition(): - failed_conditions.append(condition.__name__ or "匿名条件") - except Exception: - failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") - if failed_conditions: - result.reason = f"条件不满足: {', '.join(failed_conditions)}" - elif spec.skip_if_missing and not spec._is_cmd_available(): - result.reason = f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}" - else: - result.reason = "条件不满足" + result.reason = skip_reason + logger.info("task %r skipped (上游任务被跳过)", spec.name) + return result + + # 检查条件是否满足 + skip_reason = _check_conditions_for_skip(spec) + if skip_reason is not None: + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + result.reason = skip_reason logger.info("task %r skipped (条件不满足)", spec.name) return result @@ -196,21 +250,7 @@ async def _run_async_with_retry( while True: result.attempts += 1 try: - if _is_async_fn(spec): - coro = cast(Awaitable[Any], spec.effective_fn(*args, **kwargs)) - if spec.timeout is not None: - result.value = await asyncio.wait_for(coro, timeout=spec.timeout) - else: - result.value = await coro - else: - # 将同步工作卸载到线程,保持事件循环存活。 - def fn_call() -> Any: - return spec.effective_fn(*args, **kwargs) - - if spec.timeout is not None: - result.value = await asyncio.wait_for(loop.run_in_executor(None, fn_call), timeout=spec.timeout) - else: - result.value = await loop.run_in_executor(None, fn_call) + result.value = await _execute_async_task(spec, args, kwargs, loop) result.status = TaskStatus.SUCCESS result.finished_at = datetime.now() return result diff --git a/tests/test_executors.py b/tests/test_executors.py index dcf0b1e..482a48f 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -26,10 +26,12 @@ def test_sequential_basic() -> None: def double(extract: list[int]) -> list[int]: return [x * 2 for x in extract] - graph = px.Graph.from_specs([ - px.TaskSpec("extract", extract), - px.TaskSpec("double", double, depends_on=("extract",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("extract", extract), + px.TaskSpec("double", double, depends_on=("extract",)), + ] + ) report = px.run(graph, strategy="sequential") assert report.success assert report["extract"] == [1, 2, 3] @@ -46,12 +48,14 @@ def test_sequential_diamond() -> None: return fn - graph = px.Graph.from_specs([ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - px.TaskSpec("c", make("c"), depends_on=("a",)), - px.TaskSpec("d", make("d"), depends_on=("b", "c")), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + px.TaskSpec("c", make("c"), depends_on=("a",)), + px.TaskSpec("d", make("d"), depends_on=("b", "c")), + ] + ) report = px.run(graph, strategy="sequential") assert report.success assert report["d"] == "d" @@ -65,10 +69,12 @@ def test_failure_propagates() -> None: def downstream(_boom: None) -> int: return 1 - graph = px.Graph.from_specs([ - px.TaskSpec("boom", boom), - px.TaskSpec("downstream", downstream, depends_on=("boom",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("boom", boom), + px.TaskSpec("downstream", downstream, depends_on=("boom",)), + ] + ) with pytest.raises(TaskFailedError) as exc_info: _ = px.run(graph, strategy="sequential") assert exc_info.value.task == "boom" @@ -110,11 +116,13 @@ def test_threaded_parallelism() -> None: time.sleep(0.3) return "done" - graph = px.Graph.from_specs([ - px.TaskSpec("a", slow), - px.TaskSpec("b", slow), - px.TaskSpec("c", slow), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", slow), + px.TaskSpec("b", slow), + px.TaskSpec("c", slow), + ] + ) start = time.time() report = px.run(graph, strategy="thread", max_workers=3) elapsed = time.time() - start @@ -137,11 +145,13 @@ def test_threaded_layer_barrier() -> None: return fn - graph = px.Graph.from_specs([ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b")), - px.TaskSpec("c", make("c"), depends_on=("a", "b")), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b")), + px.TaskSpec("c", make("c"), depends_on=("a", "b")), + ] + ) report = px.run(graph, strategy="thread", max_workers=2) assert report.success # c must finish after both a and b. @@ -160,10 +170,12 @@ def test_async_basic() -> None: async def transform(fetch: int) -> int: return fetch * 2 - graph = px.Graph.from_specs([ - px.TaskSpec("fetch", fetch), - px.TaskSpec("transform", transform, depends_on=("fetch",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("fetch", fetch), + px.TaskSpec("transform", transform, depends_on=("fetch",)), + ] + ) report = px.run(graph, strategy="async") assert report.success assert report["transform"] == 84 @@ -175,11 +187,13 @@ def test_async_parallelism() -> None: await asyncio.sleep(0.3) return "done" - graph = px.Graph.from_specs([ - px.TaskSpec("a", slow), - px.TaskSpec("b", slow), - px.TaskSpec("c", slow), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", slow), + px.TaskSpec("b", slow), + px.TaskSpec("c", slow), + ] + ) start = time.time() report = px.run(graph, strategy="async") elapsed = time.time() - start @@ -195,10 +209,12 @@ def test_async_mixed_sync_and_async() -> None: await asyncio.sleep(0.01) return sync_task + 5 - graph = px.Graph.from_specs([ - px.TaskSpec("sync_task", sync_task), - px.TaskSpec("async_task", async_task, depends_on=("sync_task",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("sync_task", sync_task), + px.TaskSpec("async_task", async_task, depends_on=("sync_task",)), + ] + ) report = px.run(graph, strategy="async") assert report.success assert report["async_task"] == 15 @@ -246,10 +262,12 @@ def test_memory_backend_resume() -> None: return fn - graph = px.Graph.from_specs([ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + ] + ) backend = MemoryBackend() _ = px.run(graph, strategy="sequential", state=backend) assert runs == ["a", "b"] @@ -375,10 +393,12 @@ def test_threaded_skips_cached_tasks() -> None: return fn - graph = px.Graph.from_specs([ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + ] + ) backend = px.MemoryBackend() # 第一次运行填充缓存 _ = px.run(graph, strategy="thread", max_workers=2, state=backend) @@ -418,10 +438,12 @@ def test_async_skips_cached_tasks() -> None: runs.append("b") return a + "b" - graph = px.Graph.from_specs([ - px.TaskSpec("a", a), - px.TaskSpec("b", b, depends_on=("a",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("a", a), + px.TaskSpec("b", b, depends_on=("a",)), + ] + ) backend = px.MemoryBackend() _ = px.run(graph, strategy="async", state=backend) assert runs == ["a", "b"] @@ -497,10 +519,12 @@ def test_downstream_skipped_when_upstream_skipped_sequential() -> None: def downstream(upstream: str) -> str: return upstream + "_processed" - graph = px.Graph.from_specs([ - px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), - px.TaskSpec("downstream", downstream, depends_on=("upstream",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ] + ) report = px.run(graph, strategy="sequential") assert report.success assert report.result_of("upstream").status == px.TaskStatus.SKIPPED @@ -514,10 +538,12 @@ def test_downstream_skipped_when_upstream_skipped_thread() -> None: def downstream(upstream: str) -> str: return upstream + "_processed" - graph = px.Graph.from_specs([ - px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), - px.TaskSpec("downstream", downstream, depends_on=("upstream",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ] + ) report = px.run(graph, strategy="thread", max_workers=2) assert report.success assert report.result_of("upstream").status == px.TaskStatus.SKIPPED @@ -535,10 +561,12 @@ def test_downstream_skipped_when_upstream_skipped_async() -> None: never_true = lambda: False # noqa: E731 - graph = px.Graph.from_specs([ - px.TaskSpec("upstream", upstream, conditions=(never_true,)), - px.TaskSpec("downstream", downstream, depends_on=("upstream",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("upstream", upstream, conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ] + ) report = px.run(graph, strategy="async") assert report.success assert report.result_of("upstream").status == px.TaskStatus.SKIPPED @@ -555,10 +583,12 @@ def test_downstream_executes_when_upstream_succeeds() -> None: def downstream(upstream: str) -> str: return upstream + "_processed" - graph = px.Graph.from_specs([ - px.TaskSpec("upstream", upstream, conditions=(always_true,)), - px.TaskSpec("downstream", downstream, depends_on=("upstream",)), - ]) + graph = px.Graph.from_specs( + [ + px.TaskSpec("upstream", upstream, conditions=(always_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ] + ) report = px.run(graph, strategy="sequential") assert report.success assert report.result_of("upstream").status == px.TaskStatus.SUCCESS