From 9cc91d115306ebc5bdbe84378e2dc1f4fe9b374e Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sun, 21 Jun 2026 21:43:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=B7=B3=E8=BF=87=E5=8E=9F=E5=9B=A0=E8=AE=B0=E5=BD=95=EF=BC=8C?= =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=B8=8A=E6=B8=B8=E4=BB=BB=E5=8A=A1=E8=B7=B3?= =?UTF-8?q?=E8=BF=87=E4=BC=A0=E6=92=AD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 为TaskResult和TaskEvent新增reason字段记录跳过原因 2. 为同步/异步任务执行器添加上游任务跳过检测,自动跳过下游任务 3. 完善任务跳过的原因判断,支持条件不满足、缓存命中、上游跳过场景 4. 优化gittool工具,新增排除目录配置和更灵活的git操作流程 5. 重构测试用例格式,新增上游任务跳过的测试覆盖 6. 默认启用verbose输出,优化跳过任务的日志提示 --- src/pyflowx/cli/gittool.py | 70 +++++++++++-- src/pyflowx/executors.py | 68 +++++++++++-- src/pyflowx/task.py | 2 + tests/test_executors.py | 199 ++++++++++++++++++++++++------------- 4 files changed, 249 insertions(+), 90 deletions(-) diff --git a/src/pyflowx/cli/gittool.py b/src/pyflowx/cli/gittool.py index 811d27a..5b76b99 100644 --- a/src/pyflowx/cli/gittool.py +++ b/src/pyflowx/cli/gittool.py @@ -10,39 +10,87 @@ from pathlib import Path import pyflowx as px +EXCLUDE_DIRS = [ + # 编辑器相关目录 + ".vscode", + ".idea", + ".editorconfig", + ".trae", + ".qoder", + # 项目相关目录 + ".venv", + ".git", + ".tox", + "node_modules", +] +EXCLUDE_CMDS = [arg for d in EXCLUDE_DIRS for arg in ["-e", d]] + def init_sub_dirs() -> None: """初始化子目录的Git仓库.""" sub_dirs = [subdir for subdir in Path.cwd().iterdir() if subdir.is_dir()] for subdir in sub_dirs: px.run( - px.Graph.from_specs( - [ - px.TaskSpec("init", cmd=["git", "init"], cwd=str(subdir)), - px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], cwd=str(subdir)), - px.TaskSpec( - "commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], cwd=str(subdir) - ), - ] - ), - verbose=True, + px.Graph.from_specs([ + px.TaskSpec( + "init", + cmd=["git", "init"], + conditions=[not_has_git_repo], + cwd=str(subdir), + ), + px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], cwd=str(subdir)), + px.TaskSpec("commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], cwd=str(subdir)), + ]), ) +isub: px.TaskSpec = px.TaskSpec("isub", fn=init_sub_dirs) push: px.TaskSpec = px.TaskSpec("push", cmd=["git", "push"]) pull: px.TaskSpec = px.TaskSpec("pull", cmd=["git", "pull"]) kill_tgit: px.TaskSpec = px.TaskSpec("task_kill", cmd=["taskkill", "/f", "/t", "/im", "tgitcache.exe"]) +def not_has_git_repo() -> bool: + """检查当前目录没有Git仓库.""" + return not Path.cwd().exists() or not (Path.cwd() / ".git").is_dir() + + +def has_files() -> bool: + """检查当前目录是否有文件.""" + return bool(list(Path.cwd().glob("*"))) + + def main() -> None: """Git工具主函数.""" runner = px.CliRunner( strategy="thread", description="Gittool - Git 执行工具.", graphs={ - "isub": px.Graph.from_specs([px.TaskSpec("isub", fn=init_sub_dirs)]), + # 添加并提交 + "a": px.Graph.from_specs([ + px.TaskSpec("add", cmd=["git", "add", "."], conditions=[has_files]), + px.TaskSpec("commit", cmd=["git", "commit", "-m", "chore: update"], depends_on=["add"]), + ]), + # 清理 + "c": px.Graph.from_specs([ + px.TaskSpec("clean", cmd=["git", "clean", "-xfd", *EXCLUDE_CMDS]), + px.TaskSpec("status", cmd=["git", "status", "--porcelain"], depends_on=["clean"]), + ]), + # 初始化、添加并提交 + "i": px.Graph.from_specs([ + px.TaskSpec("init", cmd=["git", "init"], conditions=[not_has_git_repo]), + px.TaskSpec("add", cmd=["git", "add", "."], depends_on=["init"], conditions=[has_files]), + px.TaskSpec( + "commit", cmd=["git", "commit", "-m", "init commit"], depends_on=["add"], conditions=[has_files] + ), + ]), + # 初始化子目录 + "isub": px.Graph.from_specs([isub]), + # 推送 "p": px.Graph.from_specs([push]), + # 拉取 "pl": px.Graph.from_specs([pull]), + # 重启TGit缓存 "r": px.Graph.from_specs([kill_tgit]), }, ) diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 5fe32e3..d918371 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -54,6 +54,7 @@ def _emit( attempts=result.attempts, error=repr(result.error) if result.error else None, duration=result.duration, + reason=result.reason, ) ) @@ -91,14 +92,39 @@ def _run_sync_with_retry( context: Mapping[str, Any], layer_idx: int | None, on_event: EventCallback | None = None, + report: RunReport | None = None, ) -> TaskResult[Any]: """执行同步任务并带重试;返回填充好的 TaskResult。""" result: TaskResult[Any] = TaskResult(spec=spec) + # 检查上游任务是否被 SKIPPED + if report is not None: + for dep in spec.depends_on: + if dep in report.results and report.results[dep].status == TaskStatus.SKIPPED: + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + result.reason = f"上游任务 '{dep}' 被跳过" + logger.info("task %r skipped (上游任务 %r 被跳过)", spec.name, dep) + return result + # 检查条件是否满足 if not spec.should_execute(): result.status = TaskStatus.SKIPPED result.finished_at = datetime.now() + # 检查是哪个条件不满足 + failed_conditions = [] + for condition in spec.conditions: + try: + if not condition(): + failed_conditions.append(condition.__name__ or "匿名条件") + except Exception: + failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") + if failed_conditions: + result.reason = f"条件不满足: {', '.join(failed_conditions)}" + elif spec.skip_if_missing and not spec._is_cmd_available(): + result.reason = f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}" + else: + result.reason = "条件不满足" logger.info("task %r skipped (条件不满足)", spec.name) return result @@ -126,14 +152,39 @@ async def _run_async_with_retry( context: Mapping[str, Any], layer_idx: int | None, on_event: EventCallback | None = None, + report: RunReport | None = None, ) -> TaskResult[Any]: """在事件循环上执行任务(同步或异步)并带重试。""" result: TaskResult[Any] = TaskResult[Any](spec=spec) + # 检查上游任务是否被 SKIPPED + if report is not None: + for dep in spec.depends_on: + if dep in report.results and report.results[dep].status == TaskStatus.SKIPPED: + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + result.reason = f"上游任务 '{dep}' 被跳过" + logger.info("task %r skipped (上游任务 %r 被跳过)", spec.name, dep) + return result + # 检查条件是否满足 if not spec.should_execute(): result.status = TaskStatus.SKIPPED result.finished_at = datetime.now() + # 检查是哪个条件不满足 + failed_conditions = [] + for condition in spec.conditions: + try: + if not condition(): + failed_conditions.append(condition.__name__ or "匿名条件") + except Exception: + failed_conditions.append(condition.__name__ or "匿名条件(执行错误)") + if failed_conditions: + result.reason = f"条件不满足: {', '.join(failed_conditions)}" + elif spec.skip_if_missing and not spec._is_cmd_available(): + result.reason = f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}" + else: + result.reason = "条件不满足" logger.info("task %r skipped (条件不满足)", spec.name) return result @@ -207,12 +258,12 @@ def _execute_layer_sequential( if backend.has(name): cached = backend.get(name) context[name] = cached - result = TaskResult(spec=spec, status=TaskStatus.SKIPPED, value=cached) + result = TaskResult(spec=spec, status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") report.results[name] = result _emit(on_event, result) logger.info("task %r skipped (cached)", name) continue - result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event) + result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event, report) context[name] = result.value backend.save(name, result.value) report.results[name] = result @@ -236,7 +287,7 @@ def _execute_layer_threaded( if backend.has(name): cached = backend.get(name) context[name] = cached - result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached) + result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") report.results[name] = result _emit(on_event, result) else: @@ -251,7 +302,7 @@ def _execute_layer_threaded( spec = graph.spec(name) # 为本任务快照上下文以避免竞态。 task_ctx = _build_context(spec, context) - fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event) + fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event, report) future_to_name[fut] = name for fut in concurrent.futures.as_completed(future_to_name): @@ -278,7 +329,7 @@ async def _execute_layer_async( if backend.has(name): cached = backend.get(name) context[name] = cached - result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached) + result = TaskResult(spec=graph.spec(name), status=TaskStatus.SKIPPED, value=cached, reason="缓存命中") report.results[name] = result _emit(on_event, result) else: @@ -291,7 +342,7 @@ async def _execute_layer_async( for name in to_run: spec = graph.spec(name) task_ctx = _build_context(spec, context) - coros.append(_run_async_with_retry(spec, task_ctx, layer_idx, on_event)) + coros.append(_run_async_with_retry(spec, task_ctx, layer_idx, on_event, report)) results = await asyncio.gather(*coros) for name, result in zip(to_run, results): @@ -334,7 +385,8 @@ def _make_verbose_callback( flush=True, ) elif event.status == TaskStatus.SKIPPED: # pragma: no branch - print(f"[verbose] 任务 {event.task!r} 跳过", flush=True) + reason = f" ({event.reason})" if event.reason else "" + print(f"[verbose] 任务 {event.task!r} 跳过{reason}", flush=True) else: # pragma: no cover # 不可达: 执行器只发出 RUNNING/SUCCESS/FAILED/SKIPPED 事件 pass @@ -351,7 +403,7 @@ def run( *, max_workers: int | None = None, dry_run: bool = False, - verbose: bool = False, + verbose: bool = True, on_event: EventCallback | None = None, state: StateBackend | None = None, ) -> RunReport: diff --git a/src/pyflowx/task.py b/src/pyflowx/task.py index 7a78090..d3ec6a8 100644 --- a/src/pyflowx/task.py +++ b/src/pyflowx/task.py @@ -308,6 +308,7 @@ class TaskResult(Generic[T]): attempts: int = 0 started_at: Optional[datetime] = None finished_at: Optional[datetime] = None + reason: Optional[str] = None # 跳过原因 @property def duration(self) -> Optional[float]: @@ -330,3 +331,4 @@ class TaskEvent: attempts: int = 0 error: Optional[str] = None duration: Optional[float] = None + reason: Optional[str] = None # 跳过原因,如 "条件不满足"、"上游任务被跳过"、"缓存" diff --git a/tests/test_executors.py b/tests/test_executors.py index 48fa4bf..dcf0b1e 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -26,12 +26,10 @@ def test_sequential_basic() -> None: def double(extract: list[int]) -> list[int]: return [x * 2 for x in extract] - graph = px.Graph.from_specs( - [ - px.TaskSpec("extract", extract), - px.TaskSpec("double", double, depends_on=("extract",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("extract", extract), + px.TaskSpec("double", double, depends_on=("extract",)), + ]) report = px.run(graph, strategy="sequential") assert report.success assert report["extract"] == [1, 2, 3] @@ -48,14 +46,12 @@ def test_sequential_diamond() -> None: return fn - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - px.TaskSpec("c", make("c"), depends_on=("a",)), - px.TaskSpec("d", make("d"), depends_on=("b", "c")), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + px.TaskSpec("c", make("c"), depends_on=("a",)), + px.TaskSpec("d", make("d"), depends_on=("b", "c")), + ]) report = px.run(graph, strategy="sequential") assert report.success assert report["d"] == "d" @@ -69,12 +65,10 @@ def test_failure_propagates() -> None: def downstream(_boom: None) -> int: return 1 - graph = px.Graph.from_specs( - [ - px.TaskSpec("boom", boom), - px.TaskSpec("downstream", downstream, depends_on=("boom",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("boom", boom), + px.TaskSpec("downstream", downstream, depends_on=("boom",)), + ]) with pytest.raises(TaskFailedError) as exc_info: _ = px.run(graph, strategy="sequential") assert exc_info.value.task == "boom" @@ -116,13 +110,11 @@ def test_threaded_parallelism() -> None: time.sleep(0.3) return "done" - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", slow), - px.TaskSpec("b", slow), - px.TaskSpec("c", slow), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", slow), + px.TaskSpec("b", slow), + px.TaskSpec("c", slow), + ]) start = time.time() report = px.run(graph, strategy="thread", max_workers=3) elapsed = time.time() - start @@ -145,13 +137,11 @@ def test_threaded_layer_barrier() -> None: return fn - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b")), - px.TaskSpec("c", make("c"), depends_on=("a", "b")), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b")), + px.TaskSpec("c", make("c"), depends_on=("a", "b")), + ]) report = px.run(graph, strategy="thread", max_workers=2) assert report.success # c must finish after both a and b. @@ -170,12 +160,10 @@ def test_async_basic() -> None: async def transform(fetch: int) -> int: return fetch * 2 - graph = px.Graph.from_specs( - [ - px.TaskSpec("fetch", fetch), - px.TaskSpec("transform", transform, depends_on=("fetch",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("fetch", fetch), + px.TaskSpec("transform", transform, depends_on=("fetch",)), + ]) report = px.run(graph, strategy="async") assert report.success assert report["transform"] == 84 @@ -187,13 +175,11 @@ def test_async_parallelism() -> None: await asyncio.sleep(0.3) return "done" - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", slow), - px.TaskSpec("b", slow), - px.TaskSpec("c", slow), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", slow), + px.TaskSpec("b", slow), + px.TaskSpec("c", slow), + ]) start = time.time() report = px.run(graph, strategy="async") elapsed = time.time() - start @@ -209,12 +195,10 @@ def test_async_mixed_sync_and_async() -> None: await asyncio.sleep(0.01) return sync_task + 5 - graph = px.Graph.from_specs( - [ - px.TaskSpec("sync_task", sync_task), - px.TaskSpec("async_task", async_task, depends_on=("sync_task",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("sync_task", sync_task), + px.TaskSpec("async_task", async_task, depends_on=("sync_task",)), + ]) report = px.run(graph, strategy="async") assert report.success assert report["async_task"] == 15 @@ -262,12 +246,10 @@ def test_memory_backend_resume() -> None: return fn - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + ]) backend = MemoryBackend() _ = px.run(graph, strategy="sequential", state=backend) assert runs == ["a", "b"] @@ -393,12 +375,10 @@ def test_threaded_skips_cached_tasks() -> None: return fn - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", make("a")), - px.TaskSpec("b", make("b"), depends_on=("a",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", make("a")), + px.TaskSpec("b", make("b"), depends_on=("a",)), + ]) backend = px.MemoryBackend() # 第一次运行填充缓存 _ = px.run(graph, strategy="thread", max_workers=2, state=backend) @@ -438,12 +418,10 @@ def test_async_skips_cached_tasks() -> None: runs.append("b") return a + "b" - graph = px.Graph.from_specs( - [ - px.TaskSpec("a", a), - px.TaskSpec("b", b, depends_on=("a",)), - ] - ) + graph = px.Graph.from_specs([ + px.TaskSpec("a", a), + px.TaskSpec("b", b, depends_on=("a",)), + ]) backend = px.MemoryBackend() _ = px.run(graph, strategy="async", state=backend) assert runs == ["a", "b"] @@ -507,3 +485,82 @@ def test_run_empty_graph() -> None: report = px.run(graph, strategy="sequential") assert report.success assert len(report) == 0 + + +# ---------------------------------------------------------------------- # +# 上游任务被 SKIPPED 后,下游任务也应被 SKIPPED +# ---------------------------------------------------------------------- # +def test_downstream_skipped_when_upstream_skipped_sequential() -> None: + """上游任务被 SKIPPED 后,下游任务也应被 SKIPPED(sequential 策略).""" + never_true = lambda: False # noqa: E731 + + def downstream(upstream: str) -> str: + return upstream + "_processed" + + graph = px.Graph.from_specs([ + px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ]) + report = px.run(graph, strategy="sequential") + assert report.success + assert report.result_of("upstream").status == px.TaskStatus.SKIPPED + assert report.result_of("downstream").status == px.TaskStatus.SKIPPED + + +def test_downstream_skipped_when_upstream_skipped_thread() -> None: + """上游任务被 SKIPPED 后,下游任务也应被 SKIPPED(thread 策略).""" + never_true = lambda: False # noqa: E731 + + def downstream(upstream: str) -> str: + return upstream + "_processed" + + graph = px.Graph.from_specs([ + px.TaskSpec("upstream", cmd=["echo", "hello"], conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ]) + report = px.run(graph, strategy="thread", max_workers=2) + assert report.success + assert report.result_of("upstream").status == px.TaskStatus.SKIPPED + assert report.result_of("downstream").status == px.TaskStatus.SKIPPED + + +def test_downstream_skipped_when_upstream_skipped_async() -> None: + """上游任务被 SKIPPED 后,下游任务也应被 SKIPPED(async 策略).""" + + async def upstream() -> str: + return "hello" + + async def downstream(upstream: str) -> str: + return upstream + "_processed" + + never_true = lambda: False # noqa: E731 + + graph = px.Graph.from_specs([ + px.TaskSpec("upstream", upstream, conditions=(never_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ]) + report = px.run(graph, strategy="async") + assert report.success + assert report.result_of("upstream").status == px.TaskStatus.SKIPPED + assert report.result_of("downstream").status == px.TaskStatus.SKIPPED + + +def test_downstream_executes_when_upstream_succeeds() -> None: + """上游任务成功时,下游任务应正常执行.""" + always_true = lambda: True # noqa: E731 + + def upstream() -> str: + return "hello" + + def downstream(upstream: str) -> str: + return upstream + "_processed" + + graph = px.Graph.from_specs([ + px.TaskSpec("upstream", upstream, conditions=(always_true,)), + px.TaskSpec("downstream", downstream, depends_on=("upstream",)), + ]) + report = px.run(graph, strategy="sequential") + assert report.success + assert report.result_of("upstream").status == px.TaskStatus.SUCCESS + assert report.result_of("downstream").status == px.TaskStatus.SUCCESS + assert report["downstream"] == "hello_processed"