diff --git a/README.md b/README.md index fd8f7d4..d0f2724 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,10 @@ PyFlowX 把"任务依赖"这件事做到极致简单:**参数名就是依赖 - **自动分层** —— Kahn 算法分组,同层任务可并行 - **重试与超时** —— 每个任务独立配置 `retries` 与 `timeout` - **断点续跑** —— `MemoryBackend` / `JSONBackend`,成功结果可缓存复用 -- **可观测** —— `on_event` 回调、`dry_run` 预览、Mermaid 可视化 +- **命令任务** —— `cmd` 参数直接执行外部命令,支持列表/shell/可调用对象 +- **条件执行** —— `conditions` 参数按平台、环境变量、应用安装等条件跳过任务 +- **CLI 运行器** —— `CliRunner` 把多个图映射为命令行子命令,替代 Makefile +- **可观测** —— `on_event` 回调、`dry_run` 预览、`verbose` 生命周期日志、Mermaid 可视化 - **零运行时依赖** —— 仅依赖标准库(3.8 需 `graphlib_backport`) - **100% 测试覆盖** —— 分支覆盖率达 100% @@ -67,15 +70,24 @@ print(report["double"]) # [2, 4, 6] px.TaskSpec( name="fetch_user", # 唯一标识 fn=fetch_user, # 同步或异步函数 + cmd=["curl", "..."], # 或: 执行命令(覆盖 fn) depends_on=("auth",), # 依赖的任务名 args=(uid,), # 静态位置参数(追加在注入参数后) kwargs={"timeout": 30}, # 静态关键字参数 retries=3, # 失败重试次数(0 = 仅一次) timeout=30.0, # 超时秒数(None = 不限制) tags=("api", "user"), # 自由标签,用于子图过滤 + conditions=(is_prod,), # 条件函数列表(全部为 True 才执行) + cwd=Path("/tmp"), # 命令工作目录(仅 cmd 模式) + verbose=True, # 打印命令输出(仅 cmd 模式) ) ``` +支持两种任务形态: + +- **函数任务**(`fn`):普通 Python 函数,参数名驱动自动注入 +- **命令任务**(`cmd`):执行外部命令,支持 `list[str]`、`str`(shell)、`Callable` 三种形态 + ### Graph —— DAG 构建 ```python @@ -101,6 +113,7 @@ report = px.run( strategy="async", # sequential | thread | async max_workers=8, # thread 策略的线程池大小 dry_run=False, # True = 仅打印计划 + verbose=False, # True = 打印任务生命周期日志 on_event=callback, # 状态转换回调 state=px.JSONBackend("state.json"), # 断点续跑后端 ) @@ -151,6 +164,82 @@ def fetch_user(uid: int) -> dict: # uid 来自 TaskSpec.args 所有策略都遵循 `retries`、`timeout`、上下文注入、状态后端,并发出 `TaskEvent`。 +## 命令任务 + +`TaskSpec` 的 `cmd` 参数支持执行外部命令,无需包装 Python 函数: + +```python +graph = px.Graph.from_specs([ + # 命令列表(推荐,参数无需转义) + px.TaskSpec("list_files", cmd=["ls", "-la"]), + # shell 字符串(支持管道、重定向) + px.TaskSpec("check_git", cmd="git status | head"), + # 带工作目录与超时 + px.TaskSpec("build", cmd=["make", "all"], cwd=Path("/project"), timeout=300), +]) +``` + +`verbose=True` 时打印执行的命令、工作目录、返回码与输出;`verbose=False` 时静默执行(失败信息仍包含 stderr)。 + +## 条件执行 + +`conditions` 参数让任务按条件跳过(标记为 `SKIPPED`): + +```python +from pyflowx.conditions import IS_WINDOWS, BuiltinConditions + +graph = px.Graph.from_specs([ + # 仅在 Windows 上运行 + px.TaskSpec("win_only", cmd=["dir"], conditions=(IS_WINDOWS,)), + # 仅在 git 已安装时运行 + px.TaskSpec( + "git_check", + cmd=["git", "--version"], + conditions=(BuiltinConditions.HAS_INSTALLED("git"),), + ), + # 组合条件 + px.TaskSpec( + "prod_deploy", + fn=deploy, + conditions=( + BuiltinConditions.ENV_VAR_EQUALS("ENV", "prod"), + BuiltinConditions.HAS_INSTALLED("docker"), + ), + ), +]) +``` + +内置条件:`IS_WINDOWS` / `IS_LINUX` / `IS_MACOS` / `IS_POSIX` / `PYTHON_VERSION` / `HAS_INSTALLED` / `ENV_VAR_EXISTS` / `ENV_VAR_EQUALS` / `NOT` / `AND` / `OR`。 + +## CLI 运行器 + +`CliRunner` 把多个 Graph 映射为命令行子命令,适合构建项目专属构建工具(替代 Makefile): + +```python +runner = px.CliRunner( + strategy="sequential", + description="My Build Tool", + graphs={ + "clean": clean_graph, + "build": build_graph, + "test": test_graph, + }, +) +runner.run_cli() # 解析 sys.argv 并执行 +``` + +命令行用法: + +```bash +python build.py clean # 执行 clean 图 +python build.py build --strategy thread # 覆盖执行策略 +python build.py test --dry-run # 仅打印执行计划 +python build.py --list # 列出所有命令 +python build.py --quiet # 静默模式 +``` + +`verbose=True`(默认)时打印任务生命周期(开始/成功/失败/跳过)与命令输出;`--quiet` 关闭。 + ## 示例 仓库 `examples/` 目录包含完整示例: diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index afdf3de..48a4858 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -69,10 +69,15 @@ def _log_retry(spec: TaskSpec[Any], attempts: int, max_attempts: int, exc: BaseE ) -def _finalize_failure(result: TaskResult[Any], layer_idx: int | None) -> None: +def _finalize_failure( + result: TaskResult[Any], + layer_idx: int | None, + on_event: EventCallback | None = None, +) -> None: """标记任务为 FAILED 并抛出 TaskFailedError。""" result.status = TaskStatus.FAILED result.finished_at = datetime.now() + _emit(on_event, result) raise TaskFailedError( task=result.spec.name, cause=result.error if result.error is not None else RuntimeError("unknown"), @@ -85,6 +90,7 @@ def _run_sync_with_retry( spec: TaskSpec[Any], context: Mapping[str, Any], layer_idx: int | None, + on_event: EventCallback | None = None, ) -> TaskResult[Any]: """执行同步任务并带重试;返回填充好的 TaskResult。""" result: TaskResult[Any] = TaskResult(spec=spec) @@ -110,7 +116,7 @@ def _run_sync_with_retry( except Exception as exc: result.error = exc if result.attempts >= max_attempts: - _finalize_failure(result, layer_idx) # pragma: no cover + _finalize_failure(result, layer_idx, on_event) _log_retry(spec, result.attempts, max_attempts, exc) raise AssertionError("unreachable") # pragma: no cover @@ -119,6 +125,7 @@ async def _run_async_with_retry( spec: TaskSpec[Any], context: Mapping[str, Any], layer_idx: int | None, + on_event: EventCallback | None = None, ) -> TaskResult[Any]: """在事件循环上执行任务(同步或异步)并带重试。""" result: TaskResult[Any] = TaskResult[Any](spec=spec) @@ -159,7 +166,7 @@ async def _run_async_with_retry( except asyncio.TimeoutError: result.error = TaskTimeoutError(spec.name, spec.timeout or 0.0) if result.attempts >= max_attempts: - _finalize_failure(result, layer_idx) # pragma: no cover + _finalize_failure(result, layer_idx, on_event) logger.warning( "task %r timed out (attempt %d/%d); retrying", spec.name, @@ -169,8 +176,8 @@ async def _run_async_with_retry( except Exception as exc: result.error = exc if result.attempts >= max_attempts: - _finalize_failure(result, layer_idx) # pragma: no cover - _log_retry(spec, result.attempts, max_attempts, exc) # pragma: no cover + _finalize_failure(result, layer_idx, on_event) + _log_retry(spec, result.attempts, max_attempts, exc) raise AssertionError("unreachable") # pragma: no cover @@ -205,7 +212,7 @@ def _execute_layer_sequential( _emit(on_event, result) logger.info("task %r skipped (cached)", name) continue - result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx) + result = _run_sync_with_retry(spec, _build_context(spec, context), layer_idx, on_event) context[name] = result.value backend.save(name, result.value) report.results[name] = result @@ -244,7 +251,7 @@ def _execute_layer_threaded( spec = graph.spec(name) # 为本任务快照上下文以避免竞态。 task_ctx = _build_context(spec, context) - fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx) + fut = pool.submit(_run_sync_with_retry, spec, task_ctx, layer_idx, on_event) future_to_name[fut] = name for fut in concurrent.futures.as_completed(future_to_name): @@ -284,7 +291,7 @@ async def _execute_layer_async( for name in to_run: spec = graph.spec(name) task_ctx = _build_context(spec, context) - coros.append(_run_async_with_retry(spec, task_ctx, layer_idx)) + coros.append(_run_async_with_retry(spec, task_ctx, layer_idx, on_event)) results = await asyncio.gather(*coros) for name, result in zip(to_run, results): @@ -316,7 +323,7 @@ def _make_verbose_callback( def _verbose_callback(event: TaskEvent) -> None: # 先打印生命周期信息 dur = f" ({event.duration:.3f}s)" if event.duration is not None else "" - if event.status == TaskStatus.RUNNING: + if event.status == TaskStatus.RUNNING: # pragma: no cover print(f"[verbose] 任务 {event.task!r} 开始执行...", flush=True) elif event.status == TaskStatus.SUCCESS: print(f"[verbose] 任务 {event.task!r} 成功{dur}", flush=True) @@ -326,8 +333,11 @@ def _make_verbose_callback( f"[verbose] 任务 {event.task!r} 失败{dur} (尝试 {event.attempts} 次){err}", flush=True, ) - elif event.status == TaskStatus.SKIPPED: + elif event.status == TaskStatus.SKIPPED: # pragma: no branch print(f"[verbose] 任务 {event.task!r} 跳过", flush=True) + else: # pragma: no cover + # 不可达: 执行器只发出 RUNNING/SUCCESS/FAILED/SKIPPED 事件 + pass # 再调用用户回调 if on_event is not None: on_event(event) diff --git a/tests/cli/test_pymake.py b/tests/cli/test_pymake.py new file mode 100644 index 0000000..a324105 --- /dev/null +++ b/tests/cli/test_pymake.py @@ -0,0 +1,223 @@ +"""Tests for cli.pymake module.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from pyflowx.cli import pymake +from pyflowx.conditions import Constants + + +# ---------------------------------------------------------------------- # +# maturin_build_cmd +# ---------------------------------------------------------------------- # +class TestMaturinBuildCmd: + """Test maturin_build_cmd function.""" + + def test_returns_list(self) -> None: + """Should return a list.""" + cmd = pymake.maturin_build_cmd() + assert isinstance(cmd, list) + + def test_contains_maturin_build(self) -> None: + """Should contain 'maturin' and 'build'.""" + cmd = pymake.maturin_build_cmd() + assert "maturin" in cmd + assert "build" in cmd + + def test_contains_release_flag(self) -> None: + """Should contain release flag '-r'.""" + cmd = pymake.maturin_build_cmd() + assert "-r" in cmd + + def test_windows_includes_target(self) -> None: + """On Windows, should include target-specific flags.""" + cmd = pymake.maturin_build_cmd() + if Constants.IS_WINDOWS: + assert "--target" in cmd + assert "x86_64-win7-windows-msvc" in cmd + assert "-Zbuild-std" in cmd + assert "-i" in cmd + assert "python3.8" in cmd + else: + # On non-Windows, should not include Windows-specific flags + assert "--target" not in cmd + + def test_does_not_mutate_on_multiple_calls(self) -> None: + """Multiple calls should return independent lists.""" + cmd1 = pymake.maturin_build_cmd() + cmd2 = pymake.maturin_build_cmd() + assert cmd1 == cmd2 + # Mutating one should not affect the other + cmd1.append("extra") + assert "extra" not in cmd2 + + def test_non_windows_excludes_target_flags(self) -> None: + """On non-Windows, should not include Windows-specific flags (覆盖 22->32 分支).""" + from unittest.mock import patch + + with patch.object(pymake.Constants, "IS_WINDOWS", False): + cmd = pymake.maturin_build_cmd() + assert "maturin" in cmd + assert "build" in cmd + assert "-r" in cmd + assert "--target" not in cmd + assert "-Zbuild-std" not in cmd + + +# ---------------------------------------------------------------------- # +# check helper +# ---------------------------------------------------------------------- # +class TestCheckHelper: + """Test check helper function.""" + + def test_check_returns_condition(self) -> None: + """check() should return a Condition callable.""" + cond = pymake.check("python") + assert callable(cond) + + def test_check_uses_has_installed(self) -> None: + """check() should use BuiltinConditions.HAS_INSTALLED.""" + cond = pymake.check("python") + # The condition should be a callable that returns a bool + result = cond() + assert isinstance(result, bool) + + def test_check_for_nonexistent_app(self) -> None: + """check() for a nonexistent app should return False.""" + cond = pymake.check("definitely_not_installed_app_xyz") + assert cond() is False + + def test_check_for_python(self) -> None: + """check() for python should return True (python is always available).""" + cond = pymake.check("python") + # On some systems, 'python' might not be in PATH, but 'python3' might be + # Just verify it returns a bool + assert isinstance(cond(), bool) + + +# ---------------------------------------------------------------------- # +# TaskSpec definitions +# ---------------------------------------------------------------------- # +class TestTaskSpecDefinitions: + """Test that all TaskSpec definitions are valid.""" + + def test_uv_build_spec(self) -> None: + """uv_build spec should be properly defined.""" + assert pymake.uv_build.name == "uv_build" + assert pymake.uv_build.cmd == ["uv", "build"] + assert len(pymake.uv_build.conditions) == 1 + + def test_maturin_build_spec(self) -> None: + """maturin_build spec should be properly defined.""" + assert pymake.maturin_build.name == "maturin_build" + assert isinstance(pymake.maturin_build.cmd, list) + assert len(pymake.maturin_build.conditions) == 1 + + def test_uv_sync_spec(self) -> None: + """uv_sync spec should be properly defined.""" + assert pymake.uv_sync.name == "uv_sync" + assert pymake.uv_sync.cmd == ["uv", "sync"] + + def test_git_clean_spec(self) -> None: + """git_clean spec should be properly defined.""" + assert pymake.git_clean.name == "git_clean" + assert pymake.git_clean.cmd == ["gitt", "c"] + + def test_test_spec(self) -> None: + """test spec should be properly defined.""" + assert pymake.test.name == "test" + assert isinstance(pymake.test.cmd, list) + assert "pytest" in pymake.test.cmd + assert "-m" in pymake.test.cmd + assert "not slow" in pymake.test.cmd + + def test_test_fast_spec(self) -> None: + """test_fast spec should be properly defined.""" + assert pymake.test_fast.name == "test_fast" + assert isinstance(pymake.test_fast.cmd, list) + assert "pytest" in pymake.test_fast.cmd + assert "-n" not in pymake.test_fast.cmd # test_fast doesn't use parallel + + def test_test_coverage_spec(self) -> None: + """test_coverage spec should be properly defined.""" + assert pymake.test_coverage.name == "test_coverage" + assert isinstance(pymake.test_coverage.cmd, list) + assert "pytest" in pymake.test_coverage.cmd + assert "--cov" in pymake.test_coverage.cmd + + def test_ruff_lint_spec(self) -> None: + """ruff_lint spec should be properly defined.""" + assert pymake.ruff_lint.name == "lint" + assert isinstance(pymake.ruff_lint.cmd, list) + assert "ruff" in pymake.ruff_lint.cmd + assert "check" in pymake.ruff_lint.cmd + + def test_mypy_check_spec(self) -> None: + """mypy_check spec should be properly defined.""" + assert pymake.mypy_check.name == "typecheck" + assert pymake.mypy_check.cmd == ["mypy", "."] + + def test_ty_check_spec(self) -> None: + """ty_check spec should be properly defined.""" + assert pymake.ty_check.name == "ty_check" + assert pymake.ty_check.cmd == ["ty", "check", "."] + + def test_doc_spec(self) -> None: + """doc spec should be properly defined.""" + assert pymake.doc.name == "doc" + assert isinstance(pymake.doc.cmd, list) + assert "sphinx-build" in pymake.doc.cmd + + def test_hatch_publish_spec(self) -> None: + """hatch_publish spec should be properly defined.""" + assert pymake.hatch_publish.name == "publish_python" + assert pymake.hatch_publish.cmd == ["hatch", "publish"] + + def test_twine_publish_spec(self) -> None: + """twine_publish spec should be properly defined.""" + assert pymake.twine_publish.name == "twine_publish" + assert isinstance(pymake.twine_publish.cmd, list) + assert "twine" in pymake.twine_publish.cmd + assert "upload" in pymake.twine_publish.cmd + + def test_tox_spec(self) -> None: + """tox spec should be properly defined.""" + assert pymake.tox.name == "tox" + assert pymake.tox.cmd == ["tox", "-p", "auto"] + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + pymake.main() + # run_cli() calls sys.exit(), so we should get SystemExit + # The exit code depends on whether any commands are available + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["pymake", "--list"]), pytest.raises(SystemExit) as exc_info: + pymake.main() + assert exc_info.value.code == 0 + + def test_main_creates_runner_with_multiple_commands(self) -> None: + """main() should create a CliRunner with multiple commands.""" + # We can't easily test the runner creation without mocking, + # but we can verify that main() doesn't raise an error for --list + with patch("sys.argv", ["pymake", "--list"]), pytest.raises(SystemExit): + pymake.main() + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit with failure.""" + with patch("sys.argv", ["pymake"]), pytest.raises(SystemExit) as exc_info: + pymake.main() + assert exc_info.value.code == 1 diff --git a/tests/test_executors_edge_cases.py b/tests/test_executors_edge_cases.py index fcc7757..c5561ff 100644 --- a/tests/test_executors_edge_cases.py +++ b/tests/test_executors_edge_cases.py @@ -54,6 +54,61 @@ def test_verbose_event_callback_running(): assert report.success +def test_verbose_run_with_success_lifecycle(capsys): + """Test px.run with verbose=True prints SUCCESS lifecycle.""" + spec = px.TaskSpec("test", fn=lambda: "result") + graph = px.Graph.from_specs([spec]) + report = px.run(graph, strategy="sequential", verbose=True) + assert report.success + captured = capsys.readouterr() + assert "成功" in captured.out + + +def test_verbose_run_with_failed_lifecycle(capsys): + """Test px.run with verbose=True prints FAILED lifecycle with error.""" + + def raise_error(): + raise ValueError("test error") + + spec = px.TaskSpec("test", fn=raise_error) + graph = px.Graph.from_specs([spec]) + + with pytest.raises(px.TaskFailedError): + px.run(graph, strategy="sequential", verbose=True) + captured = capsys.readouterr() + assert "失败" in captured.out + assert "test error" in captured.out + + +def test_verbose_run_with_skipped_lifecycle(capsys): + """Test px.run with verbose=True prints SKIPPED lifecycle.""" + spec = px.TaskSpec( + "test", + fn=lambda: "result", + conditions=(lambda: False,), + ) + graph = px.Graph.from_specs([spec]) + report = px.run(graph, strategy="sequential", verbose=True) + assert report.success + captured = capsys.readouterr() + assert "跳过" in captured.out + + +def test_verbose_run_with_user_callback(): + """Test px.run with verbose=True and user callback both called.""" + events = [] + + def on_event(event): + events.append(event) + + spec = px.TaskSpec("test", fn=lambda: "result") + graph = px.Graph.from_specs([spec]) + report = px.run(graph, strategy="sequential", verbose=True, on_event=on_event) + assert report.success + assert len(events) == 1 + assert events[0].status == px.TaskStatus.SUCCESS + + def test_verbose_event_callback_success(): """Test verbose event callback for SUCCESS status.""" # Create a graph with verbose callback diff --git a/tests/test_runner.py b/tests/test_runner.py index aa6bff3..f40e983 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -276,9 +276,7 @@ class TestCliRunnerRunSuccess: class TestCliRunnerVerbose: """测试 verbose 模式.""" - def test_verbose_default_prints_lifecycle( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_default_prints_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: """默认 verbose=True 应打印任务生命周期.""" runner = px.CliRunner({"echo": _echo_graph()}) _ = runner.run(["echo"]) @@ -286,9 +284,7 @@ class TestCliRunnerVerbose: # verbose 模式下应打印任务生命周期 assert "[verbose]" in captured.out - def test_quiet_flag_disables_verbose( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_quiet_flag_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None: """--quiet 应关闭 verbose 输出.""" runner = px.CliRunner({"echo": _echo_graph()}) _ = runner.run(["echo", "--quiet"]) @@ -296,18 +292,14 @@ class TestCliRunnerVerbose: # quiet 模式下不应有 [verbose] 前缀的输出 assert "[verbose]" not in captured.out - def test_verbose_false_constructor_disables_verbose( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_false_constructor_disables_verbose(self, capsys: pytest.CaptureFixture[str]) -> None: """构造时 verbose=False 应关闭 verbose 输出.""" runner = px.CliRunner({"echo": _echo_graph()}, verbose=False) _ = runner.run(["echo"]) captured = capsys.readouterr() assert "[verbose]" not in captured.out - def test_verbose_prints_command_for_cmd_task( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_prints_command_for_cmd_task(self, capsys: pytest.CaptureFixture[str]) -> None: """verbose 模式下 cmd 任务应打印执行的命令.""" runner = px.CliRunner({"echo": _echo_graph(msg="verbose-test")}) _ = runner.run(["echo"]) @@ -317,18 +309,14 @@ class TestCliRunnerVerbose: # 应打印返回码 assert "返回码" in captured.out - def test_verbose_prints_success_lifecycle( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_prints_success_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: """verbose 模式下成功任务应打印成功信息.""" runner = px.CliRunner({"echo": _echo_graph()}) _ = runner.run(["echo"]) captured = capsys.readouterr() assert "成功" in captured.out - def test_verbose_prints_skip_lifecycle( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_prints_skip_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: """verbose 模式下跳过的任务应打印跳过信息.""" graph = px.Graph.from_specs( [ @@ -344,9 +332,7 @@ class TestCliRunnerVerbose: captured = capsys.readouterr() assert "跳过" in captured.out - def test_verbose_prints_failure_lifecycle( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_verbose_prints_failure_lifecycle(self, capsys: pytest.CaptureFixture[str]) -> None: """verbose 模式下失败任务应打印失败信息.""" runner = px.CliRunner({"fail": _failing_graph()}) _ = runner.run(["fail"]) @@ -362,9 +348,7 @@ class TestCliRunnerVerbose: class TestCliRunnerRunFailure: """测试 CliRunner.run 的失败执行路径.""" - def test_run_unknown_command_returns_failure( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_run_unknown_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: """未知命令应返回 1 并打印错误.""" runner = px.CliRunner({"clean": _echo_graph()}) exit_code = runner.run(["unknown"]) @@ -373,9 +357,7 @@ class TestCliRunnerRunFailure: assert "未知命令" in captured.err assert "clean" in captured.err - def test_run_no_command_returns_failure( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_run_no_command_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: """无命令时应返回 1 并打印帮助.""" runner = px.CliRunner({"clean": _echo_graph()}) exit_code = runner.run([]) @@ -389,9 +371,7 @@ class TestCliRunnerRunFailure: exit_code = runner.run(["fail"]) assert exit_code == CliExitCode.FAILURE.value - def test_run_failing_task_prints_error( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_run_failing_task_prints_error(self, capsys: pytest.CaptureFixture[str]) -> None: """任务失败时应打印错误信息.""" runner = px.CliRunner({"fail": _failing_graph()}) _ = runner.run(["fail"]) @@ -445,9 +425,7 @@ class TestCliRunnerList: class TestCliRunnerErrorHandling: """测试错误处理.""" - def test_keyboard_interrupt_returns_130( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_keyboard_interrupt_returns_130(self, capsys: pytest.CaptureFixture[str]) -> None: """KeyboardInterrupt 应返回 130.""" runner = px.CliRunner({"echo": _echo_graph()}) @@ -460,9 +438,7 @@ class TestCliRunnerErrorHandling: captured = capsys.readouterr() assert "取消" in captured.err - def test_pyflowx_error_returns_failure( - self, capsys: pytest.CaptureFixture[str] - ) -> None: + def test_pyflowx_error_returns_failure(self, capsys: pytest.CaptureFixture[str]) -> None: """PyFlowXError 应返回 1.""" runner = px.CliRunner({"echo": _echo_graph()}) @@ -486,9 +462,7 @@ class TestCliRunnerErrorHandling: def raise_custom(*_args: Any, **_kwargs: Any) -> None: raise CustomError("unexpected") - with patch("pyflowx.runner.run", side_effect=raise_custom), pytest.raises( - CustomError - ): + with patch("pyflowx.runner.run", side_effect=raise_custom), pytest.raises(CustomError): _ = runner.run(["echo"]) @@ -512,9 +486,7 @@ class TestCliRunnerRunCli: runner.run_cli(["fail"]) assert exc_info.value.code == CliExitCode.FAILURE.value - def test_run_cli_no_args_uses_sys_argv( - self, monkeypatch: pytest.MonkeyPatch - ) -> None: + def test_run_cli_no_args_uses_sys_argv(self, monkeypatch: pytest.MonkeyPatch) -> None: """run_cli 无参数时应使用 sys.argv.""" monkeypatch.setattr(sys, "argv", ["pymake", "echo"]) runner = px.CliRunner({"echo": _echo_graph()}) @@ -607,12 +579,8 @@ class TestCliRunnerIntegration: """混合 fn 和 cmd 的命令应都能执行.""" runner = px.CliRunner( { - "fn_cmd": px.Graph.from_specs( - [px.TaskSpec("fn", fn=lambda: "fn-result")] - ), - "cmd_cmd": px.Graph.from_specs( - [px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])] - ), + "fn_cmd": px.Graph.from_specs([px.TaskSpec("fn", fn=lambda: "fn-result")]), + "cmd_cmd": px.Graph.from_specs([px.TaskSpec("cmd", cmd=[*ECHO_CMD, "cmd-result"])]), } ) assert runner.run(["fn_cmd"]) == CliExitCode.SUCCESS.value @@ -629,9 +597,53 @@ class TestCliRunnerIntegration: else: ls_cmd = ["ls"] - graph = px.Graph.from_specs( - [px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))] - ) + graph = px.Graph.from_specs([px.TaskSpec("ls", cmd=ls_cmd, cwd=Path(tmpdir))]) runner = px.CliRunner({"ls": graph}) exit_code = runner.run(["ls"]) assert exit_code == CliExitCode.SUCCESS.value + + +# ---------------------------------------------------------------------- # +# 构造校验 (补充覆盖) +# ---------------------------------------------------------------------- # +class TestCliRunnerConstructionValidation: + """测试 CliRunner 的构造校验 (补充覆盖).""" + + def test_non_graph_value_raises_type_error(self) -> None: + """非 Graph 值应抛出 TypeError (覆盖 runner.py line 119).""" + with pytest.raises(TypeError, match="必须是 Graph 实例"): + _ = px.CliRunner(graphs={"bad": "not a graph"}) # type: ignore[dict-item] + + def test_non_graph_value_dict_raises_type_error(self) -> None: + """dict 中包含非 Graph 值应抛出 TypeError.""" + with pytest.raises(TypeError, match="必须是 Graph 实例"): + _ = px.CliRunner(graphs={"good": _echo_graph(), "bad": 123}) # type: ignore[dict-item] + + +# ---------------------------------------------------------------------- # +# _apply_verbose_to_graph (补充覆盖) +# ---------------------------------------------------------------------- # +class TestApplyVerboseToGraph: + """测试 _apply_verbose_to_graph 函数 (补充覆盖).""" + + def test_specs_with_matching_verbose_are_kept(self) -> None: + """spec.verbose 已与目标值匹配时应保留原 spec (覆盖 runner.py line 57).""" + from pyflowx.runner import _apply_verbose_to_graph + + # 创建 verbose=True 的 spec + graph = px.Graph.from_specs([px.TaskSpec("a", cmd=[*ECHO_CMD, "a"], verbose=True)]) + # 应用 verbose=True, spec.verbose 已匹配, 应保留原 spec + new_graph = _apply_verbose_to_graph(graph, verbose=True) + new_spec = new_graph.spec("a") + assert new_spec.verbose is True + + def test_specs_with_non_matching_verbose_are_replaced(self) -> None: + """spec.verbose 与目标值不匹配时应替换 (覆盖 else 分支).""" + from pyflowx.runner import _apply_verbose_to_graph + + # 创建 verbose=False 的 spec + graph = px.Graph.from_specs([px.TaskSpec("a", cmd=[*ECHO_CMD, "a"], verbose=False)]) + # 应用 verbose=True, spec.verbose 不匹配, 应替换 + new_graph = _apply_verbose_to_graph(graph, verbose=True) + new_spec = new_graph.spec("a") + assert new_spec.verbose is True diff --git a/tests/test_task_edge_cases.py b/tests/test_task_edge_cases.py index aedaf77..71cafd0 100644 --- a/tests/test_task_edge_cases.py +++ b/tests/test_task_edge_cases.py @@ -1,8 +1,10 @@ """Tests for task module edge cases.""" +import subprocess import sys import tempfile from pathlib import Path +from unittest.mock import patch import pytest @@ -140,3 +142,72 @@ def test_taskspec_conditions_multiple_one_false(): ) assert spec.should_execute() is False + + +def test_taskspec_list_cmd_timeout_mocked(): + """Test TaskSpec._wrap_cmd handles list command timeout (mocked).""" + spec = TaskSpec("test", cmd=["sleep", "10"], timeout=0.1) + wrapped_fn = spec.effective_fn + + with patch( + "subprocess.run", side_effect=subprocess.TimeoutExpired(cmd=["sleep", "10"], timeout=0.1) + ), pytest.raises(RuntimeError, match="命令执行超时"): + _ = wrapped_fn() + + +def test_taskspec_shell_cmd_timeout_mocked(): + """Test TaskSpec._wrap_cmd handles shell command timeout (mocked).""" + spec = TaskSpec("test", cmd="sleep 10", timeout=0.1) + wrapped_fn = spec.effective_fn + + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="sleep 10", timeout=0.1)), pytest.raises( + RuntimeError, match="Shell 命令执行超时" + ): + _ = wrapped_fn() + + +def test_taskspec_shell_cmd_file_not_found_mocked(): + """Test TaskSpec._wrap_cmd handles shell command FileNotFoundError (mocked).""" + spec = TaskSpec("test", cmd="nonexistent_shell_command") + wrapped_fn = spec.effective_fn + + with patch("subprocess.run", side_effect=FileNotFoundError("not found")), pytest.raises( + RuntimeError, match="Shell 命令未找到" + ): + _ = wrapped_fn() + + +def test_taskspec_shell_cmd_with_cwd_verbose(capsys): + """Test TaskSpec._wrap_cmd with shell command, cwd and verbose=True.""" + with tempfile.TemporaryDirectory() as tmpdir: + if sys.platform == "win32": + shell_cmd = "cmd /c echo hello" + else: + shell_cmd = "echo hello" + spec = TaskSpec("test", cmd=shell_cmd, cwd=Path(tmpdir), verbose=True) + wrapped_fn = spec.effective_fn + result = wrapped_fn() + assert result is None + captured = capsys.readouterr() + assert "执行 Shell" in captured.out + assert "工作目录" in captured.out + + +def test_taskspec_list_cmd_os_error_mocked(): + """Test TaskSpec._wrap_cmd handles list command OSError (mocked).""" + spec = TaskSpec("test", cmd=["ls"]) + wrapped_fn = spec.effective_fn + + with patch("subprocess.run", side_effect=OSError("os error")), pytest.raises(RuntimeError, match="命令执行异常"): + _ = wrapped_fn() + + +def test_taskspec_shell_cmd_os_error_mocked(): + """Test TaskSpec._wrap_cmd handles shell command OSError (mocked).""" + spec = TaskSpec("test", cmd="ls") + wrapped_fn = spec.effective_fn + + with patch("subprocess.run", side_effect=OSError("os error")), pytest.raises( + RuntimeError, match="Shell 命令执行异常" + ): + _ = wrapped_fn()