diff --git a/pyproject.toml b/pyproject.toml index ff71207..9834604 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ version = "0.1.7" [project.scripts] autofmt = "pyflowx.cli.autofmt:main" bumpver = "pyflowx.cli.bumpversion:main" -clrscr = "pyflowx.cli.clearscreen:main" +cls = "pyflowx.cli.clearscreen:main" envpy = "pyflowx.cli.envpy:main" envqt = "pyflowx.cli.envqt:main" envrs = "pyflowx.cli.envrs:main" diff --git a/src/pyflowx/cli/clearscreen.py b/src/pyflowx/cli/clearscreen.py index 9f13846..19f35f0 100644 --- a/src/pyflowx/cli/clearscreen.py +++ b/src/pyflowx/cli/clearscreen.py @@ -5,7 +5,6 @@ from __future__ import annotations -import os import subprocess import pyflowx as px @@ -17,52 +16,17 @@ from pyflowx.conditions import Constants def clear_screen() -> None: - """清屏.""" - if Constants.IS_WINDOWS: - os.system("cls") - else: - os.system("clear") - - -def clear_screen_python() -> None: - """Python 方式清屏 (跨平台).""" - print("\033[2J\033[H", end="") - - -def clear_screen_cmd() -> None: """使用系统命令清屏.""" if Constants.IS_WINDOWS: subprocess.run(["cmd", "/c", "cls"], check=False) else: subprocess.run(["clear"], check=False) - -# ============================================================================ -# TaskSpec 定义 -# ============================================================================ - -clearscreen: px.TaskSpec = px.TaskSpec("clearscreen", fn=clear_screen) -clearscreen_py: px.TaskSpec = px.TaskSpec("clearscreen_py", fn=clear_screen_python) -clearscreen_cmd: px.TaskSpec = px.TaskSpec("clearscreen_cmd", fn=clear_screen_cmd) - - -# ============================================================================ -# CLI Runner -# ============================================================================ + print("\033[2J\033[H", end="") + print("ClearScreen - 清屏工具") def main() -> None: """清屏工具主函数.""" - runner = px.CliRunner( - strategy="thread", - description="ClearScreen - 清屏工具", - graphs={ - # 清屏 (os.system) - "c": px.Graph.from_specs([clearscreen]), - # 清屏 (Python) - "p": px.Graph.from_specs([clearscreen_py]), - # 清屏 (cmd) - "cmd": px.Graph.from_specs([clearscreen_cmd]), - }, - ) - runner.run_cli() + graph = px.Graph.from_specs([px.TaskSpec("clearscreen", fn=clear_screen)]) + px.run(graph, strategy="thread") diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index 9c28bba..dfff9b1 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -443,7 +443,7 @@ def run( *, max_workers: int | None = None, dry_run: bool = False, - verbose: bool = True, + verbose: bool = False, on_event: EventCallback | None = None, state: StateBackend | None = None, ) -> RunReport: diff --git a/tests/cli/test_autofmt.py b/tests/cli/test_autofmt.py new file mode 100644 index 0000000..8927a73 --- /dev/null +++ b/tests/cli/test_autofmt.py @@ -0,0 +1,209 @@ +"""Tests for cli.autofmt module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +import pyflowx as px +from pyflowx.cli import autofmt + + +# ---------------------------------------------------------------------- # +# auto_add_docstrings +# ---------------------------------------------------------------------- # +class TestAutoAddDocstrings: + """Test auto_add_docstrings function.""" + + def test_auto_add_docstrings_to_file(self, tmp_path: Path) -> None: + """Should add docstrings to Python file.""" + test_file = tmp_path / "test.py" + test_file.write_text("def test_func():\n pass\n") + + with patch.object(autofmt, "add_docstring_to_file") as mock_add: + autofmt.auto_add_docstrings(tmp_path) + # Should call add_docstring_to_file for each Python file + assert mock_add.called + + def test_auto_add_docstrings_skips_non_python_files(self, tmp_path: Path) -> None: + """Should skip non-Python files.""" + text_file = tmp_path / "test.txt" + text_file.write_text("not a python file") + + with patch.object(autofmt, "add_docstring_to_file") as mock_add: + autofmt.auto_add_docstrings(tmp_path) + # Should not call add_docstring_to_file for non-Python files + assert not mock_add.called + + +# ---------------------------------------------------------------------- # +# sync_pyproject_config +# ---------------------------------------------------------------------- # +class TestSyncPyprojectConfig: + """Test sync_pyproject_config function.""" + + def test_sync_pyproject_config_creates_file(self, tmp_path: Path) -> None: + """Should create pyproject.toml if it doesn't exist.""" + with patch.object(Path, "exists", return_value=False), \ + patch.object(Path, "write_text") as mock_write: + autofmt.sync_pyproject_config(tmp_path) + # Should create pyproject.toml + assert mock_write.called + + def test_sync_pyproject_config_updates_file(self, tmp_path: Path) -> None: + """Should update existing pyproject.toml.""" + pyproject = tmp_path / "pyproject.toml" + pyproject.write_text("[tool.ruff]\n") + + with patch.object(Path, "exists", return_value=True), \ + patch.object(Path, "read_text", return_value="[tool.ruff]\n"), \ + patch.object(Path, "write_text") as mock_write: + autofmt.sync_pyproject_config(tmp_path) + # Should update pyproject.toml + assert mock_write.called + + +# ---------------------------------------------------------------------- # +# format_all +# ---------------------------------------------------------------------- # +class TestFormatAll: + """Test format_all function.""" + + def test_format_all_runs_ruff_format(self) -> None: + """Should run ruff format.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + autofmt.format_all(Path()) + # Should call ruff format + assert mock_run.called + + def test_format_all_runs_ruff_check(self) -> None: + """Should run ruff check.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + autofmt.format_all(Path()) + # Should call ruff check + assert mock_run.call_count >= 2 + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_fmt_default_target(self) -> None: + """main() should handle fmt with default target.""" + with patch("sys.argv", ["autofmt", "fmt"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "ruff" in spec.cmd + assert "format" in spec.cmd + assert "." in spec.cmd + + def test_main_fmt_custom_target(self) -> None: + """main() should handle fmt with custom target.""" + with patch("sys.argv", ["autofmt", "fmt", "--target", "src"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "ruff" in spec.cmd + assert "format" in spec.cmd + assert "src" in spec.cmd + + def test_main_lint_default_target(self) -> None: + """main() should handle lint with default target.""" + with patch("sys.argv", ["autofmt", "lint"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "ruff" in spec.cmd + assert "check" in spec.cmd + + def test_main_lint_with_fix(self) -> None: + """main() should handle lint with fix.""" + with patch("sys.argv", ["autofmt", "lint", "--fix"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "ruff" in spec.cmd + assert "check" in spec.cmd + assert "--fix" in spec.cmd + assert "--unsafe-fixes" in spec.cmd + + def test_main_lint_custom_target(self) -> None: + """main() should handle lint with custom target.""" + with patch("sys.argv", ["autofmt", "lint", "--target", "src"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.called + + def test_main_doc_default_root(self) -> None: + """main() should handle doc with default root.""" + with patch("sys.argv", ["autofmt", "doc"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(autofmt, "auto_add_docstrings"): + autofmt.main() + assert mock_run.called + + def test_main_doc_custom_root(self) -> None: + """main() should handle doc with custom root.""" + with patch("sys.argv", ["autofmt", "doc", "--root-dir", "src"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(autofmt, "auto_add_docstrings"): + autofmt.main() + assert mock_run.called + + def test_main_sync_default_root(self) -> None: + """main() should handle sync with default root.""" + with patch("sys.argv", ["autofmt", "sync"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(autofmt, "sync_pyproject_config"): + autofmt.main() + assert mock_run.called + + def test_main_sync_custom_root(self) -> None: + """main() should handle sync with custom root.""" + with patch("sys.argv", ["autofmt", "sync", "--root-dir", "src"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(autofmt, "sync_pyproject_config"): + autofmt.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["autofmt"]), pytest.raises(SystemExit) as exc_info: + autofmt.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_specs_with_verbose(self) -> None: + """main() should create TaskSpecs with verbose=True.""" + with patch("sys.argv", ["autofmt", "fmt"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert spec.verbose is True + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["autofmt", "fmt"]), \ + patch.object(px, "run") as mock_run: + autofmt.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_bumpversion.py b/tests/cli/test_bumpversion.py new file mode 100644 index 0000000..f4a0c68 --- /dev/null +++ b/tests/cli/test_bumpversion.py @@ -0,0 +1,106 @@ +"""Tests for cli.bumpversion module.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from pyflowx.cli import bumpversion + + +# ---------------------------------------------------------------------- # +# bump_version +# ---------------------------------------------------------------------- # +class TestBumpVersion: + """Test bump_version function.""" + + def test_bump_version_patch(self) -> None: + """Should bump patch version.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + bumpversion.bump_version("patch") + assert mock_run.called + + def test_bump_version_minor(self) -> None: + """Should bump minor version.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + bumpversion.bump_version("minor") + assert mock_run.called + + def test_bump_version_major(self) -> None: + """Should bump major version.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + bumpversion.bump_version("major") + assert mock_run.called + + def test_bump_version_with_tag(self) -> None: + """Should bump version with tag.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + bumpversion.bump_version("patch", tag=True) + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# bump_version_alpha +# ---------------------------------------------------------------------- # +class TestBumpVersionAlpha: + """Test bump_version_alpha function.""" + + def test_bump_version_alpha_patch(self) -> None: + """Should bump alpha patch version.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + bumpversion.bump_version_alpha("patch") + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# TaskSpec definitions +# ---------------------------------------------------------------------- # +class TestTaskSpecDefinitions: + """Test that all TaskSpec definitions are valid.""" + + def test_bump_patch_spec(self) -> None: + """bump_patch spec should be properly defined.""" + assert bumpversion.bump_patch.name == "bump_patch" + assert bumpversion.bump_patch.fn is not None + + def test_bump_minor_spec(self) -> None: + """bump_minor spec should be properly defined.""" + assert bumpversion.bump_minor.name == "bump_minor" + assert bumpversion.bump_minor.fn is not None + + def test_bump_major_spec(self) -> None: + """bump_major spec should be properly defined.""" + assert bumpversion.bump_major.name == "bump_major" + assert bumpversion.bump_major.fn is not None + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + bumpversion.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["bumpversion", "--list"]), pytest.raises(SystemExit) as exc_info: + bumpversion.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["bumpversion"]), pytest.raises(SystemExit) as exc_info: + bumpversion.main() + assert exc_info.value.code == 1 diff --git a/tests/cli/test_clearscreen.py b/tests/cli/test_clearscreen.py new file mode 100644 index 0000000..8910852 --- /dev/null +++ b/tests/cli/test_clearscreen.py @@ -0,0 +1,106 @@ +"""Tests for cli.clearscreen module.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import clearscreen +from pyflowx.conditions import Constants + + +# ---------------------------------------------------------------------- # +# clear_screen +# ---------------------------------------------------------------------- # +class TestClearScreen: + """Test clear_screen function.""" + + def test_clear_screen_windows(self) -> None: + """Should clear screen on Windows.""" + if Constants.IS_WINDOWS: + with patch("os.system") as mock_system: + clearscreen.clear_screen() + assert mock_system.called + + def test_clear_screen_linux(self) -> None: + """Should clear screen on Linux.""" + with patch.object(Constants, "IS_WINDOWS", False), \ + patch("os.system") as mock_system: + clearscreen.clear_screen() + assert mock_system.called + + +# ---------------------------------------------------------------------- # +# clear_screen_python +# ---------------------------------------------------------------------- # +class TestClearScreenPython: + """Test clear_screen_python function.""" + + def test_clear_screen_python(self) -> None: + """Should clear screen using Python.""" + with patch("builtins.print") as mock_print: + clearscreen.clear_screen_python() + assert mock_print.called + + +# ---------------------------------------------------------------------- # +# clear_screen_cmd +# ---------------------------------------------------------------------- # +class TestClearScreenCmd: + """Test clear_screen_cmd function.""" + + def test_clear_screen_cmd(self) -> None: + """Should clear screen using cmd.""" + with patch("os.system") as mock_system: + clearscreen.clear_screen_cmd() + assert mock_system.called + + +# ---------------------------------------------------------------------- # +# TaskSpec definitions +# ---------------------------------------------------------------------- # +class TestTaskSpecDefinitions: + """Test that all TaskSpec definitions are valid.""" + + def test_clearscreen_spec(self) -> None: + """clearscreen spec should be properly defined.""" + assert clearscreen.clearscreen.name == "clearscreen" + assert clearscreen.clearscreen.fn is not None + + def test_clearscreen_py_spec(self) -> None: + """clearscreen_py spec should be properly defined.""" + assert clearscreen.clearscreen_py.name == "clearscreen_py" + assert clearscreen.clearscreen_py.fn is not None + + def test_clearscreen_cmd_spec(self) -> None: + """clearscreen_cmd spec should be properly defined.""" + assert clearscreen.clearscreen_cmd.name == "clearscreen_cmd" + assert clearscreen.clearscreen_cmd.fn is not None + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + clearscreen.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["clearscreen", "--list"]), pytest.raises(SystemExit) as exc_info: + clearscreen.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["clearscreen"]), pytest.raises(SystemExit) as exc_info: + clearscreen.main() + assert exc_info.value.code == 1 \ No newline at end of file diff --git a/tests/cli/test_envpy.py b/tests/cli/test_envpy.py new file mode 100644 index 0000000..9d89f63 --- /dev/null +++ b/tests/cli/test_envpy.py @@ -0,0 +1,111 @@ +"""Tests for cli.envpy module.""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import envpy + + +# ---------------------------------------------------------------------- # +# set_pip_mirror +# ---------------------------------------------------------------------- # +class TestSetPipMirror: + """Test set_pip_mirror function.""" + + def test_set_pip_mirror_tsinghua(self, tmp_path: Path) -> None: + """Should set tsinghua mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envpy.set_pip_mirror("tsinghua") + # Check pip config + pip_config = tmp_path / "pip" / "pip.ini" + if envpy.Constants.IS_WINDOWS: + assert pip_config.exists() or (tmp_path / "pip" / "pip.conf").exists() + + def test_set_pip_mirror_aliyun(self, tmp_path: Path) -> None: + """Should set aliyun mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envpy.set_pip_mirror("aliyun") + # Check pip config + pip_dir = tmp_path / "pip" + assert pip_dir.exists() + + def test_set_pip_mirror_with_token(self, tmp_path: Path) -> None: + """Should set mirror with token.""" + with patch.object(Path, "home", return_value=tmp_path): + envpy.set_pip_mirror("tsinghua", token="test_token") + # Check that token is set + + def test_set_pip_mirror_creates_pip_dir(self, tmp_path: Path) -> None: + """Should create pip directory if it doesn't exist.""" + pip_dir = tmp_path / "pip" + with patch.object(Path, "home", return_value=tmp_path): + envpy.set_pip_mirror("tsinghua") + assert pip_dir.exists() + assert pip_dir.is_dir() + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_mirror_tsinghua(self) -> None: + """main() should handle mirror tsinghua command.""" + with patch("sys.argv", ["envpy", "mirror", "tsinghua"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envpy, "set_pip_mirror"): + envpy.main() + assert mock_run.called + + def test_main_mirror_aliyun(self) -> None: + """main() should handle mirror aliyun command.""" + with patch("sys.argv", ["envpy", "mirror", "aliyun"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envpy, "set_pip_mirror"): + envpy.main() + assert mock_run.called + + def test_main_mirror_with_token(self) -> None: + """main() should handle mirror with token.""" + with patch("sys.argv", ["envpy", "mirror", "tsinghua", "--token", "test_token"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envpy, "set_pip_mirror"): + envpy.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["envpy"]), pytest.raises(SystemExit) as exc_info: + envpy.main() + assert exc_info.value.code == 2 + + def test_main_invalid_mirror_shows_error(self) -> None: + """main() with invalid mirror should show error.""" + with patch("sys.argv", ["envpy", "mirror", "invalid"]), pytest.raises(SystemExit) as exc_info: + envpy.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["envpy", "mirror", "tsinghua"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envpy, "set_pip_mirror"): + envpy.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "set_pip_mirror" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["envpy", "mirror", "tsinghua"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envpy, "set_pip_mirror"): + envpy.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_envqt.py b/tests/cli/test_envqt.py new file mode 100644 index 0000000..3204fb4 --- /dev/null +++ b/tests/cli/test_envqt.py @@ -0,0 +1,50 @@ +"""Tests for cli.envqt module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import envqt + + +# ---------------------------------------------------------------------- # +# set_qt_mirror +# ---------------------------------------------------------------------- # +class TestSetQtMirror: + """Test set_qt_mirror function.""" + + def test_set_qt_mirror(self, tmp_path: Path) -> None: + """Should set Qt mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envqt.set_qt_mirror() + # Check Qt config + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + envqt.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["envqt", "--list"]), pytest.raises(SystemExit) as exc_info: + envqt.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["envqt"]), pytest.raises(SystemExit) as exc_info: + envqt.main() + assert exc_info.value.code == 1 \ No newline at end of file diff --git a/tests/cli/test_envrs.py b/tests/cli/test_envrs.py new file mode 100644 index 0000000..4b0c235 --- /dev/null +++ b/tests/cli/test_envrs.py @@ -0,0 +1,214 @@ +"""Tests for cli.envrs module.""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +import pyflowx as px +from pyflowx.cli import envrs + + +# ---------------------------------------------------------------------- # +# set_rust_mirror +# ---------------------------------------------------------------------- # +class TestSetRustMirror: + """Test set_rust_mirror function.""" + + def test_set_rust_mirror_aliyun(self, tmp_path: Path) -> None: + """Should set aliyun mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envrs.set_rust_mirror("aliyun") + # Check environment variables + assert os.environ.get("RUSTUP_DIST_SERVER") == "https://mirrors.aliyun.com/rustup" + assert os.environ.get("RUSTUP_UPDATE_ROOT") == "https://mirrors.aliyun.com/rustup/rustup" + # Check cargo config + cargo_config = tmp_path / ".cargo" / "config.toml" + assert cargo_config.exists() + content = cargo_config.read_text() + assert "aliyun" in content + + def test_set_rust_mirror_ustc(self, tmp_path: Path) -> None: + """Should set ustc mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envrs.set_rust_mirror("ustc") + assert os.environ.get("RUSTUP_DIST_SERVER") == "https://mirrors.ustc.edu.cn/rust-static" + assert os.environ.get("RUSTUP_UPDATE_ROOT") == "https://mirrors.ustc.edu.cn/rust-static/rustup" + + def test_set_rust_mirror_tsinghua(self, tmp_path: Path) -> None: + """Should set tsinghua mirror.""" + with patch.object(Path, "home", return_value=tmp_path): + envrs.set_rust_mirror("tsinghua") + assert os.environ.get("RUSTUP_DIST_SERVER") == "https://mirrors.tuna.tsinghua.edu.cn/rustup" + assert os.environ.get("RUSTUP_UPDATE_ROOT") == "https://mirrors.tuna.tsinghua.edu.cn/rustup/rustup" + + def test_set_rust_mirror_unknown_uses_default(self, tmp_path: Path) -> None: + """Should use default mirror for unknown mirror name.""" + with patch.object(Path, "home", return_value=tmp_path): + envrs.set_rust_mirror("unknown") + # Should use default mirror (tsinghua) + assert os.environ.get("RUSTUP_DIST_SERVER") == "https://mirrors.tuna.tsinghua.edu.cn/rustup" + + def test_set_rust_mirror_creates_cargo_dir(self, tmp_path: Path) -> None: + """Should create .cargo directory if it doesn't exist.""" + cargo_dir = tmp_path / ".cargo" + with patch.object(Path, "home", return_value=tmp_path): + envrs.set_rust_mirror("aliyun") + assert cargo_dir.exists() + assert cargo_dir.is_dir() + + def test_set_rust_mirror_prints_message(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should print mirror name.""" + with patch.object(Path, "home", return_value=Path("/tmp")): + envrs.set_rust_mirror("aliyun") + captured = capsys.readouterr() + assert "已设置 Rust 镜像源: aliyun" in captured.out + + +# ---------------------------------------------------------------------- # +# install_rust +# ---------------------------------------------------------------------- # +class TestInstallRust: + """Test install_rust function.""" + + def test_install_rust_stable(self) -> None: + """Should install stable Rust.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + envrs.install_rust("stable") + mock_run.assert_called_once_with(["rustup", "toolchain", "install", "stable"], check=True) + + def test_install_rust_nightly(self) -> None: + """Should install nightly Rust.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + envrs.install_rust("nightly") + mock_run.assert_called_once_with(["rustup", "toolchain", "install", "nightly"], check=True) + + def test_install_rust_beta(self) -> None: + """Should install beta Rust.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + envrs.install_rust("beta") + mock_run.assert_called_once_with(["rustup", "toolchain", "install", "beta"], check=True) + + def test_install_rust_file_not_found(self) -> None: + """Should raise FileNotFoundError when rustup not found.""" + with patch("subprocess.run", side_effect=FileNotFoundError): + with pytest.raises(FileNotFoundError): + envrs.install_rust("stable") + + def test_install_rust_prints_message(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should print installation message.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + envrs.install_rust("stable") + captured = capsys.readouterr() + assert "已安装 Rust stable" in captured.out + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_mirror_aliyun(self) -> None: + """main() should handle mirror aliyun command.""" + with patch("sys.argv", ["envrs", "mirror", "aliyun"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + assert mock_run.called + + def test_main_mirror_ustc(self) -> None: + """main() should handle mirror ustc command.""" + with patch("sys.argv", ["envrs", "mirror", "ustc"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + assert mock_run.called + + def test_main_mirror_tsinghua(self) -> None: + """main() should handle mirror tsinghua command.""" + with patch("sys.argv", ["envrs", "mirror", "tsinghua"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + assert mock_run.called + + def test_main_mirror_default(self) -> None: + """main() should use default mirror when not specified.""" + with patch("sys.argv", ["envrs", "mirror"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + assert mock_run.called + + def test_main_install_stable(self) -> None: + """main() should handle install stable command.""" + with patch("sys.argv", ["envrs", "install", "stable"]), \ + patch.object(px, "run") as mock_run: + envrs.main() + assert mock_run.called + + def test_main_install_nightly(self) -> None: + """main() should handle install nightly command.""" + with patch("sys.argv", ["envrs", "install", "nightly"]), \ + patch.object(px, "run") as mock_run: + envrs.main() + assert mock_run.called + + def test_main_install_beta(self) -> None: + """main() should handle install beta command.""" + with patch("sys.argv", ["envrs", "install", "beta"]), \ + patch.object(px, "run") as mock_run: + envrs.main() + assert mock_run.called + + def test_main_install_default(self) -> None: + """main() should use default version when not specified.""" + with patch("sys.argv", ["envrs", "install"]), \ + patch.object(px, "run") as mock_run: + envrs.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["envrs"]), pytest.raises(SystemExit) as exc_info: + envrs.main() + assert exc_info.value.code == 2 + + def test_main_invalid_version_shows_error(self) -> None: + """main() with invalid version should show error.""" + with patch("sys.argv", ["envrs", "install", "invalid"]), pytest.raises(SystemExit) as exc_info: + envrs.main() + assert exc_info.value.code == 2 + + def test_main_invalid_mirror_shows_error(self) -> None: + """main() with invalid mirror should show error.""" + with patch("sys.argv", ["envrs", "mirror", "invalid"]), pytest.raises(SystemExit) as exc_info: + envrs.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_verbose(self) -> None: + """main() should create TaskSpec with verbose=True.""" + with patch("sys.argv", ["envrs", "mirror", "aliyun"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert spec.verbose is True + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["envrs", "mirror", "aliyun"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(envrs, "set_rust_mirror"): + envrs.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_filedate.py b/tests/cli/test_filedate.py new file mode 100644 index 0000000..d94474e --- /dev/null +++ b/tests/cli/test_filedate.py @@ -0,0 +1,113 @@ +"""Tests for cli.filedate module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import filedate + + +# ---------------------------------------------------------------------- # +# process_files_date +# ---------------------------------------------------------------------- # +class TestProcessFilesDate: + """Test process_files_date function.""" + + def test_process_files_date_add(self, tmp_path: Path) -> None: + """Should add date prefix.""" + test_file = tmp_path / "test.txt" + test_file.write_text("test content") + + with patch.object(filedate, "rename_file_with_date") as mock_rename: + filedate.process_files_date([test_file], clear=False) + assert mock_rename.called + + def test_process_files_date_clear(self, tmp_path: Path) -> None: + """Should clear date prefix.""" + test_file = tmp_path / "2024-01-01_test.txt" + test_file.write_text("test content") + + with patch.object(filedate, "rename_file_with_date") as mock_rename: + filedate.process_files_date([test_file], clear=True) + assert mock_rename.called + + def test_process_files_date_multiple_files(self, tmp_path: Path) -> None: + """Should process multiple files.""" + test_files = [ + tmp_path / "test1.txt", + tmp_path / "test2.txt", + tmp_path / "test3.txt", + ] + for f in test_files: + f.write_text("test content") + + with patch.object(filedate, "rename_file_with_date") as mock_rename: + filedate.process_files_date(test_files, clear=False) + assert mock_rename.call_count == 3 + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_add_single_file(self) -> None: + """main() should handle add command with single file.""" + with patch("sys.argv", ["filedate", "add", "test.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + assert mock_run.called + + def test_main_add_multiple_files(self) -> None: + """main() should handle add command with multiple files.""" + with patch("sys.argv", ["filedate", "add", "test1.txt", "test2.txt", "test3.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + assert mock_run.called + + def test_main_clear_single_file(self) -> None: + """main() should handle clear command with single file.""" + with patch("sys.argv", ["filedate", "clear", "test.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + assert mock_run.called + + def test_main_clear_multiple_files(self) -> None: + """main() should handle clear command with multiple files.""" + with patch("sys.argv", ["filedate", "clear", "test1.txt", "test2.txt", "test3.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["filedate"]), pytest.raises(SystemExit) as exc_info: + filedate.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["filedate", "add", "test.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "process_files_date" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["filedate", "add", "test.txt"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(filedate, "process_files_date"): + filedate.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_filelevel.py b/tests/cli/test_filelevel.py new file mode 100644 index 0000000..d7a4809 --- /dev/null +++ b/tests/cli/test_filelevel.py @@ -0,0 +1,170 @@ +"""Tests for cli.filelevel module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import filelevel + + +# ---------------------------------------------------------------------- # +# process_files_level +# ---------------------------------------------------------------------- # +class TestProcessFilesLevel: + """Test process_files_level function.""" + + def test_process_files_level_clear(self, tmp_path: Path) -> None: + """Should clear level markers.""" + test_file = tmp_path / "[PUB]test.txt" + test_file.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level([test_file], level=0) + assert mock_rename.called + + def test_process_files_level_pub(self, tmp_path: Path) -> None: + """Should set PUB level.""" + test_file = tmp_path / "test.txt" + test_file.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level([test_file], level=1) + assert mock_rename.called + + def test_process_files_level_int(self, tmp_path: Path) -> None: + """Should set INT level.""" + test_file = tmp_path / "test.txt" + test_file.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level([test_file], level=2) + assert mock_rename.called + + def test_process_files_level_con(self, tmp_path: Path) -> None: + """Should set CON level.""" + test_file = tmp_path / "test.txt" + test_file.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level([test_file], level=3) + assert mock_rename.called + + def test_process_files_level_cla(self, tmp_path: Path) -> None: + """Should set CLA level.""" + test_file = tmp_path / "test.txt" + test_file.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level([test_file], level=4) + assert mock_rename.called + + def test_process_files_level_multiple_files(self, tmp_path: Path) -> None: + """Should process multiple files.""" + test_files = [ + tmp_path / "test1.txt", + tmp_path / "test2.txt", + tmp_path / "test3.txt", + ] + for f in test_files: + f.write_text("test content") + + with patch.object(filelevel, "rename_file_with_level") as mock_rename: + filelevel.process_files_level(test_files, level=1) + assert mock_rename.call_count == 3 + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_set_single_file(self) -> None: + """main() should handle set command with single file.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "1"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_multiple_files(self) -> None: + """main() should handle set command with multiple files.""" + with patch( + "sys.argv", ["filelevel", "set", "test1.txt", "test2.txt", "test3.txt", "--level", "2"] + ), patch.object(px, "run") as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_level_0(self) -> None: + """main() should handle set command with level 0.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "0"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_level_1(self) -> None: + """main() should handle set command with level 1.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "1"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_level_2(self) -> None: + """main() should handle set command with level 2.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "2"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_level_3(self) -> None: + """main() should handle set command with level 3.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "3"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_set_level_4(self) -> None: + """main() should handle set command with level 4.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "4"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["filelevel"]), pytest.raises(SystemExit) as exc_info: + filelevel.main() + assert exc_info.value.code == 2 + + def test_main_invalid_level_shows_error(self) -> None: + """main() with invalid level should show error.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "5"]), pytest.raises(SystemExit) as exc_info: + filelevel.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "1"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "process_files_level" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["filelevel", "set", "test.txt", "--level", "1"]), patch.object( + px, "run" + ) as mock_run, patch.object(filelevel, "process_files_level"): + filelevel.main() + assert mock_run.call_args[1]["strategy"] == "thread" diff --git a/tests/cli/test_folderback.py b/tests/cli/test_folderback.py new file mode 100644 index 0000000..ae8bf46 --- /dev/null +++ b/tests/cli/test_folderback.py @@ -0,0 +1,78 @@ +"""Tests for cli.folderback module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import folderback + + +# ---------------------------------------------------------------------- # +# backup_folder +# ---------------------------------------------------------------------- # +class TestBackupFolder: + """Test backup_folder function.""" + + def test_backup_folder_with_source_and_backup(self, tmp_path: Path) -> None: + """Should backup folder with source and backup paths.""" + source_dir = tmp_path / "source" + source_dir.mkdir() + backup_dir = tmp_path / "backup" + backup_dir.mkdir() + + with patch("shutil.copytree") as mock_copy: + folderback.backup_folder(str(source_dir), str(backup_dir), 5) + assert mock_copy.called + + def test_backup_folder_with_max_backups(self, tmp_path: Path) -> None: + """Should backup folder with max backups.""" + source_dir = tmp_path / "source" + source_dir.mkdir() + backup_dir = tmp_path / "backup" + backup_dir.mkdir() + + with patch("shutil.copytree") as mock_copy: + folderback.backup_folder(str(source_dir), str(backup_dir), 10) + assert mock_copy.called + + +# ---------------------------------------------------------------------- # +# TaskSpec definitions +# ---------------------------------------------------------------------- # +class TestTaskSpecDefinitions: + """Test that all TaskSpec definitions are valid.""" + + def test_folderback_default_spec(self) -> None: + """folderback_default spec should be properly defined.""" + assert folderback.folderback_default.name == "folderback_default" + assert folderback.folderback_default.fn is not None + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + folderback.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["folderback", "--list"]), pytest.raises(SystemExit) as exc_info: + folderback.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["folderback"]), pytest.raises(SystemExit) as exc_info: + folderback.main() + assert exc_info.value.code == 1 \ No newline at end of file diff --git a/tests/cli/test_folderzip.py b/tests/cli/test_folderzip.py new file mode 100644 index 0000000..52fe3fe --- /dev/null +++ b/tests/cli/test_folderzip.py @@ -0,0 +1,72 @@ +"""Tests for cli.folderzip module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import folderzip + + +# ---------------------------------------------------------------------- # +# zip_folder +# ---------------------------------------------------------------------- # +class TestZipFolder: + """Test zip_folder function.""" + + def test_zip_folder_with_source_and_output(self, tmp_path: Path) -> None: + """Should zip folder with source and output paths.""" + source_dir = tmp_path / "source" + source_dir.mkdir() + output_file = tmp_path / "output.zip" + + with patch("zipfile.ZipFile") as mock_zip: + folderzip.zip_folder(str(source_dir), str(output_file)) + assert mock_zip.called + + +# ---------------------------------------------------------------------- # +# unzip_folder +# ---------------------------------------------------------------------- # +class TestUnzipFolder: + """Test unzip_folder function.""" + + def test_unzip_folder_with_zip_and_output(self, tmp_path: Path) -> None: + """Should unzip folder with zip and output paths.""" + zip_file = tmp_path / "test.zip" + zip_file.write_bytes(b"ZIP content") + output_dir = tmp_path / "output" + output_dir.mkdir() + + with patch("zipfile.ZipFile") as mock_zip: + folderzip.unzip_folder(str(zip_file), str(output_dir)) + assert mock_zip.called + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + folderzip.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["folderzip", "--list"]), pytest.raises(SystemExit) as exc_info: + folderzip.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["folderzip"]), pytest.raises(SystemExit) as exc_info: + folderzip.main() + assert exc_info.value.code == 1 \ No newline at end of file diff --git a/tests/cli/test_gittool.py b/tests/cli/test_gittool.py new file mode 100644 index 0000000..b37b25c --- /dev/null +++ b/tests/cli/test_gittool.py @@ -0,0 +1,59 @@ +"""Tests for cli.gittool module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import gittool + + +# ---------------------------------------------------------------------- # +# TaskSpec definitions +# ---------------------------------------------------------------------- # +class TestTaskSpecDefinitions: + """Test that all TaskSpec definitions are valid.""" + + def test_push_spec(self) -> None: + """push spec should be properly defined.""" + assert gittool.push.name == "push" + assert gittool.push.cmd == ["git", "push"] + + def test_pull_spec(self) -> None: + """pull spec should be properly defined.""" + assert gittool.pull.name == "pull" + assert gittool.pull.cmd == ["git", "pull"] + + def test_kill_tgit_spec(self) -> None: + """kill_tgit spec should be properly defined.""" + assert gittool.kill_tgit.name == "task_kill" + assert "taskkill" in gittool.kill_tgit.cmd + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_calls_run_cli(self) -> None: + """main() should create a CliRunner and call run_cli().""" + with pytest.raises(SystemExit) as exc_info: + gittool.main() + # run_cli() calls sys.exit(), so we should get SystemExit + assert exc_info.value.code in (0, 1, 2) + + def test_main_with_list_argument(self) -> None: + """main() should handle --list argument.""" + with patch("sys.argv", ["gittool", "--list"]), pytest.raises(SystemExit) as exc_info: + gittool.main() + assert exc_info.value.code == 0 + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["gittool"]), pytest.raises(SystemExit) as exc_info: + gittool.main() + assert exc_info.value.code == 1 \ No newline at end of file diff --git a/tests/cli/test_lscalc.py b/tests/cli/test_lscalc.py new file mode 100644 index 0000000..a81ad99 --- /dev/null +++ b/tests/cli/test_lscalc.py @@ -0,0 +1,164 @@ +"""Tests for cli.lscalc module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +import pyflowx as px +from pyflowx.cli import lscalc +from pyflowx.conditions import Constants + + +# ---------------------------------------------------------------------- # +# run_ls_dyna +# ---------------------------------------------------------------------- # +class TestRunLsDyna: + """Test run_ls_dyna function.""" + + def test_run_ls_dyna_with_input_file(self) -> None: + """Should run LS-DYNA with input file.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna("test.k", ncpu=4) + assert mock_run.called + + def test_run_ls_dyna_with_custom_ncpu(self) -> None: + """Should run LS-DYNA with custom CPU count.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna("test.k", ncpu=8) + assert mock_run.called + # Check that ncpu is passed correctly + + def test_run_ls_dyna_windows_command(self) -> None: + """Should use Windows command format on Windows.""" + if Constants.IS_WINDOWS: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna("test.k", ncpu=4) + assert mock_run.called + # Check command format + + def test_run_ls_dyna_linux_command(self) -> None: + """Should use Linux command format on Linux.""" + with patch.object(Constants, "IS_WINDOWS", False), \ + patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna("test.k", ncpu=4) + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# run_ls_dyna_mpi +# ---------------------------------------------------------------------- # +class TestRunLsDynaMpi: + """Test run_ls_dyna_mpi function.""" + + def test_run_ls_dyna_mpi_with_input_file(self) -> None: + """Should run LS-DYNA MPI with input file.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna_mpi("test.k", ncpu=4) + assert mock_run.called + + def test_run_ls_dyna_mpi_with_custom_ncpu(self) -> None: + """Should run LS-DYNA MPI with custom CPU count.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + lscalc.run_ls_dyna_mpi("test.k", ncpu=8) + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# check_ls_dyna_status +# ---------------------------------------------------------------------- # +class TestCheckLsDynaStatus: + """Test check_ls_dyna_status function.""" + + def test_check_ls_dyna_status_running(self) -> None: + """Should detect running LS-DYNA process.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="lsdyna.exe\n", returncode=0) + result = lscalc.check_ls_dyna_status() + assert mock_run.called + + def test_check_ls_dyna_status_not_running(self) -> None: + """Should detect no LS-DYNA process.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="", returncode=0) + result = lscalc.check_ls_dyna_status() + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_run_with_input_file(self) -> None: + """main() should handle run command with input file.""" + with patch("sys.argv", ["lscalc", "run", "test.k"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna"): + lscalc.main() + assert mock_run.called + + def test_main_run_with_custom_ncpu(self) -> None: + """main() should handle run command with custom CPU count.""" + with patch("sys.argv", ["lscalc", "run", "test.k", "--ncpu", "8"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna"): + lscalc.main() + assert mock_run.called + + def test_main_mpi_with_input_file(self) -> None: + """main() should handle mpi command with input file.""" + with patch("sys.argv", ["lscalc", "mpi", "test.k"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna_mpi"): + lscalc.main() + assert mock_run.called + + def test_main_mpi_with_custom_ncpu(self) -> None: + """main() should handle mpi command with custom CPU count.""" + with patch("sys.argv", ["lscalc", "mpi", "test.k", "--ncpu", "8"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna_mpi"): + lscalc.main() + assert mock_run.called + + def test_main_status(self) -> None: + """main() should handle status command.""" + with patch("sys.argv", ["lscalc", "status"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "check_ls_dyna_status"): + lscalc.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["lscalc"]), pytest.raises(SystemExit) as exc_info: + lscalc.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["lscalc", "run", "test.k"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna"): + lscalc.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "run_ls_dyna" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["lscalc", "run", "test.k"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(lscalc, "run_ls_dyna"): + lscalc.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_packtool.py b/tests/cli/test_packtool.py new file mode 100644 index 0000000..b1dd56b --- /dev/null +++ b/tests/cli/test_packtool.py @@ -0,0 +1,296 @@ +"""Tests for cli.packtool module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +import pyflowx as px +from pyflowx.cli import packtool + + +# ---------------------------------------------------------------------- # +# pack_source +# ---------------------------------------------------------------------- # +class TestPackSource: + """Test pack_source function.""" + + def test_pack_source_with_project_dir(self, tmp_path: Path) -> None: + """Should pack source from project directory.""" + project_dir = tmp_path / "project" + project_dir.mkdir() + output_dir = tmp_path / "output" + output_dir.mkdir() + + with patch("shutil.make_archive") as mock_archive: + packtool.pack_source(project_dir, output_dir) + assert mock_archive.called + + def test_pack_source_creates_output_dir(self, tmp_path: Path) -> None: + """Should create output directory if it doesn't exist.""" + project_dir = tmp_path / "project" + project_dir.mkdir() + output_dir = tmp_path / "output" + + with patch("shutil.make_archive") as mock_archive: + packtool.pack_source(project_dir, output_dir) + assert output_dir.exists() + + +# ---------------------------------------------------------------------- # +# pack_dependencies +# ---------------------------------------------------------------------- # +class TestPackDependencies: + """Test pack_dependencies function.""" + + def test_pack_dependencies_with_lib_dir(self, tmp_path: Path) -> None: + """Should pack dependencies from lib directory.""" + lib_dir = tmp_path / "lib" + lib_dir.mkdir() + dependencies = ["numpy", "pandas"] + + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + packtool.pack_dependencies(lib_dir, dependencies) + assert mock_run.called + + def test_pack_dependencies_empty_list(self, tmp_path: Path) -> None: + """Should handle empty dependency list.""" + lib_dir = tmp_path / "lib" + lib_dir.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + packtool.pack_dependencies(lib_dir, []) + # Should still work with empty list + + +# ---------------------------------------------------------------------- # +# pack_wheel +# ---------------------------------------------------------------------- # +class TestPackWheel: + """Test pack_wheel function.""" + + def test_pack_wheel_with_project_dir(self, tmp_path: Path) -> None: + """Should pack wheel from project directory.""" + project_dir = tmp_path / "project" + project_dir.mkdir() + output_dir = tmp_path / "output" + output_dir.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + packtool.pack_wheel(project_dir, output_dir) + assert mock_run.called + + def test_pack_wheel_creates_output_dir(self, tmp_path: Path) -> None: + """Should create output directory if it doesn't exist.""" + project_dir = tmp_path / "project" + project_dir.mkdir() + output_dir = tmp_path / "output" + + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + packtool.pack_wheel(project_dir, output_dir) + assert output_dir.exists() + + +# ---------------------------------------------------------------------- # +# install_embed_python +# ---------------------------------------------------------------------- # +class TestInstallEmbedPython: + """Test install_embed_python function.""" + + def test_install_embed_python_with_version(self, tmp_path: Path) -> None: + """Should install embedded Python with version.""" + output_dir = tmp_path / "python" + + with patch("subprocess.run") as mock_run, \ + patch.object(Path, "exists", return_value=False): + mock_run.return_value = MagicMock(returncode=0) + packtool.install_embed_python("3.10", output_dir) + assert mock_run.called + + def test_install_embed_python_creates_output_dir(self, tmp_path: Path) -> None: + """Should create output directory if it doesn't exist.""" + output_dir = tmp_path / "python" + + with patch("subprocess.run") as mock_run, \ + patch.object(Path, "exists", return_value=False): + mock_run.return_value = MagicMock(returncode=0) + packtool.install_embed_python("3.10", output_dir) + assert output_dir.exists() + + +# ---------------------------------------------------------------------- # +# create_zip_package +# ---------------------------------------------------------------------- # +class TestCreateZipPackage: + """Test create_zip_package function.""" + + def test_create_zip_package_with_source_dir(self, tmp_path: Path) -> None: + """Should create zip package from source directory.""" + source_dir = tmp_path / "source" + source_dir.mkdir() + output_file = tmp_path / "package.zip" + + with patch("zipfile.ZipFile") as mock_zip: + packtool.create_zip_package(source_dir, output_file) + assert mock_zip.called + + def test_create_zip_package_with_files(self, tmp_path: Path) -> None: + """Should create zip package with files.""" + source_dir = tmp_path / "source" + source_dir.mkdir() + test_file = source_dir / "test.txt" + test_file.write_text("test content") + output_file = tmp_path / "package.zip" + + with patch("zipfile.ZipFile") as mock_zip: + packtool.create_zip_package(source_dir, output_file) + assert mock_zip.called + + +# ---------------------------------------------------------------------- # +# clean_build_dir +# ---------------------------------------------------------------------- # +class TestCleanBuildDir: + """Test clean_build_dir function.""" + + def test_clean_build_dir_removes_directory(self, tmp_path: Path) -> None: + """Should remove build directory.""" + build_dir = tmp_path / "build" + build_dir.mkdir() + + with patch("shutil.rmtree") as mock_rmtree: + packtool.clean_build_dir(build_dir) + assert mock_rmtree.called + + def test_clean_build_dir_nonexistent(self, tmp_path: Path) -> None: + """Should handle nonexistent build directory.""" + build_dir = tmp_path / "build" + + with patch.object(Path, "exists", return_value=False): + packtool.clean_build_dir(build_dir) + # Should not raise error + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_src_default_dirs(self) -> None: + """main() should handle src command with default dirs.""" + with patch("sys.argv", ["packtool", "src"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_source"): + packtool.main() + assert mock_run.called + + def test_main_src_custom_dirs(self) -> None: + """main() should handle src command with custom dirs.""" + with patch("sys.argv", ["packtool", "src", "--project-dir", "project", "--output-dir", "output"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_source"): + packtool.main() + assert mock_run.called + + def test_main_deps_default_dir(self) -> None: + """main() should handle deps command with default dir.""" + with patch("sys.argv", ["packtool", "deps"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_dependencies"): + packtool.main() + assert mock_run.called + + def test_main_deps_with_dependencies(self) -> None: + """main() should handle deps command with dependencies.""" + with patch("sys.argv", ["packtool", "deps", "--lib-dir", "lib", "numpy", "pandas"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_dependencies"): + packtool.main() + assert mock_run.called + + def test_main_wheel_default_dirs(self) -> None: + """main() should handle wheel command with default dirs.""" + with patch("sys.argv", ["packtool", "wheel"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_wheel"): + packtool.main() + assert mock_run.called + + def test_main_wheel_custom_dirs(self) -> None: + """main() should handle wheel command with custom dirs.""" + with patch("sys.argv", ["packtool", "wheel", "--project-dir", "project", "--output-dir", "output"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_wheel"): + packtool.main() + assert mock_run.called + + def test_main_embed_default_version(self) -> None: + """main() should handle embed command with default version.""" + with patch("sys.argv", ["packtool", "embed"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "install_embed_python"): + packtool.main() + assert mock_run.called + + def test_main_embed_custom_version(self) -> None: + """main() should handle embed command with custom version.""" + with patch("sys.argv", ["packtool", "embed", "--version", "3.11", "--output-dir", "python"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "install_embed_python"): + packtool.main() + assert mock_run.called + + def test_main_zip_default_params(self) -> None: + """main() should handle zip command with default params.""" + with patch("sys.argv", ["packtool", "zip"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "create_zip_package"): + packtool.main() + assert mock_run.called + + def test_main_zip_custom_params(self) -> None: + """main() should handle zip command with custom params.""" + with patch("sys.argv", ["packtool", "zip", "--source-dir", "source", "--output-file", "package.zip"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "create_zip_package"): + packtool.main() + assert mock_run.called + + def test_main_clean(self) -> None: + """main() should handle clean command.""" + with patch("sys.argv", ["packtool", "clean"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "clean_build_dir"): + packtool.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["packtool"]), pytest.raises(SystemExit) as exc_info: + packtool.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["packtool", "src"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_source"): + packtool.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "pack_source" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["packtool", "src"]), \ + patch.object(px, "run") as mock_run, \ + patch.object(packtool, "pack_source"): + packtool.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file diff --git a/tests/cli/test_pdftool.py b/tests/cli/test_pdftool.py new file mode 100644 index 0000000..a7b77fd --- /dev/null +++ b/tests/cli/test_pdftool.py @@ -0,0 +1,468 @@ +"""Tests for cli.pdftool module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +import pyflowx as px +from pyflowx.cli import pdftool + + +# ---------------------------------------------------------------------- # +# pdf_merge +# ---------------------------------------------------------------------- # +class TestPdfMerge: + """Test pdf_merge function.""" + + def test_pdf_merge_single_file(self, tmp_path: Path) -> None: + """Should merge single PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "merged.pdf" + + with patch("pypdf.PdfMerger") as mock_merger: + pdftool.pdf_merge([input_file], output_file) + assert mock_merger.called + + def test_pdf_merge_multiple_files(self, tmp_path: Path) -> None: + """Should merge multiple PDF files.""" + input_files = [ + tmp_path / "input1.pdf", + tmp_path / "input2.pdf", + tmp_path / "input3.pdf", + ] + for f in input_files: + f.write_bytes(b"PDF content") + output_file = tmp_path / "merged.pdf" + + with patch("pypdf.PdfMerger") as mock_merger: + pdftool.pdf_merge(input_files, output_file) + assert mock_merger.called + + +# ---------------------------------------------------------------------- # +# pdf_split +# ---------------------------------------------------------------------- # +class TestPdfSplit: + """Test pdf_split function.""" + + def test_pdf_split_single_file(self, tmp_path: Path) -> None: + """Should split single PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_dir = tmp_path / "split" + output_dir.mkdir() + + with patch("pypdf.PdfReader") as mock_reader: + mock_reader.return_value.pages = [MagicMock(), MagicMock()] + pdftool.pdf_split(input_file, output_dir) + assert mock_reader.called + + def test_pdf_split_creates_output_dir(self, tmp_path: Path) -> None: + """Should create output directory if it doesn't exist.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_dir = tmp_path / "split" + + with patch("pypdf.PdfReader") as mock_reader: + mock_reader.return_value.pages = [MagicMock()] + pdftool.pdf_split(input_file, output_dir) + assert output_dir.exists() + + +# ---------------------------------------------------------------------- # +# pdf_compress +# ---------------------------------------------------------------------- # +class TestPdfCompress: + """Test pdf_compress function.""" + + def test_pdf_compress_file(self, tmp_path: Path) -> None: + """Should compress PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "compressed.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_compress(input_file, output_file) + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_encrypt +# ---------------------------------------------------------------------- # +class TestPdfEncrypt: + """Test pdf_encrypt function.""" + + def test_pdf_encrypt_file(self, tmp_path: Path) -> None: + """Should encrypt PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "encrypted.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_encrypt(input_file, output_file, "password") + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_decrypt +# ---------------------------------------------------------------------- # +class TestPdfDecrypt: + """Test pdf_decrypt function.""" + + def test_pdf_decrypt_file(self, tmp_path: Path) -> None: + """Should decrypt PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "decrypted.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_decrypt(input_file, output_file, "password") + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_extract_text +# ---------------------------------------------------------------------- # +class TestPdfExtractText: + """Test pdf_extract_text function.""" + + def test_pdf_extract_text_file(self, tmp_path: Path) -> None: + """Should extract text from PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "output.txt" + + with patch("pypdf.PdfReader") as mock_reader: + mock_reader.return_value.pages = [MagicMock()] + pdftool.pdf_extract_text(input_file, output_file) + assert mock_reader.called + + +# ---------------------------------------------------------------------- # +# pdf_extract_images +# ---------------------------------------------------------------------- # +class TestPdfExtractImages: + """Test pdf_extract_images function.""" + + def test_pdf_extract_images_file(self, tmp_path: Path) -> None: + """Should extract images from PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_dir = tmp_path / "images" + output_dir.mkdir() + + with patch("pypdf.PdfReader") as mock_reader: + mock_reader.return_value.pages = [MagicMock()] + pdftool.pdf_extract_images(input_file, output_dir) + assert mock_reader.called + + +# ---------------------------------------------------------------------- # +# pdf_add_watermark +# ---------------------------------------------------------------------- # +class TestPdfAddWatermark: + """Test pdf_add_watermark function.""" + + def test_pdf_add_watermark_file(self, tmp_path: Path) -> None: + """Should add watermark to PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "watermarked.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_add_watermark(input_file, output_file, text="CONFIDENTIAL") + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_rotate +# ---------------------------------------------------------------------- # +class TestPdfRotate: + """Test pdf_rotate function.""" + + def test_pdf_rotate_file_90(self, tmp_path: Path) -> None: + """Should rotate PDF file by 90 degrees.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "rotated.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_rotate(input_file, output_file, rotation=90) + assert mock_reader.called + assert mock_writer.called + + def test_pdf_rotate_file_180(self, tmp_path: Path) -> None: + """Should rotate PDF file by 180 degrees.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "rotated.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_rotate(input_file, output_file, rotation=180) + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_crop +# ---------------------------------------------------------------------- # +class TestPdfCrop: + """Test pdf_crop function.""" + + def test_pdf_crop_file(self, tmp_path: Path) -> None: + """Should crop PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "cropped.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_crop(input_file, output_file, margins=(10, 10, 10, 10)) + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# pdf_info +# ---------------------------------------------------------------------- # +class TestPdfInfo: + """Test pdf_info function.""" + + def test_pdf_info_file(self, tmp_path: Path) -> None: + """Should show info of PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + + with patch("pypdf.PdfReader") as mock_reader: + pdftool.pdf_info(input_file) + assert mock_reader.called + + +# ---------------------------------------------------------------------- # +# pdf_ocr +# ---------------------------------------------------------------------- # +class TestPdfOcr: + """Test pdf_ocr function.""" + + def test_pdf_ocr_file(self, tmp_path: Path) -> None: + """Should OCR PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "ocr.pdf" + + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + pdftool.pdf_ocr(input_file, output_file, lang="chi_sim+eng") + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# pdf_to_images +# ---------------------------------------------------------------------- # +class TestPdfToImages: + """Test pdf_to_images function.""" + + def test_pdf_to_images_file(self, tmp_path: Path) -> None: + """Should convert PDF to images.""" + pytest.importorskip("pdf2image") + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_dir = tmp_path / "images" + output_dir.mkdir() + + with patch("pdf2image.convert_from_path") as mock_convert: + mock_convert.return_value = [MagicMock()] + pdftool.pdf_to_images(input_file, output_dir, dpi=300) + assert mock_convert.called + + +# ---------------------------------------------------------------------- # +# pdf_repair +# ---------------------------------------------------------------------- # +class TestPdfRepair: + """Test pdf_repair function.""" + + def test_pdf_repair_file(self, tmp_path: Path) -> None: + """Should repair PDF file.""" + input_file = tmp_path / "input.pdf" + input_file.write_bytes(b"PDF content") + output_file = tmp_path / "repaired.pdf" + + with patch("pypdf.PdfReader") as mock_reader, patch("pypdf.PdfWriter") as mock_writer: + pdftool.pdf_repair(input_file, output_file) + assert mock_reader.called + assert mock_writer.called + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_merge_single_file(self) -> None: + """main() should handle merge command with single file.""" + with patch("sys.argv", ["pdftool", "m", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_merge" + ): + pdftool.main() + assert mock_run.called + + def test_main_merge_multiple_files(self) -> None: + """main() should handle merge command with multiple files.""" + with patch("sys.argv", ["pdftool", "m", "input1.pdf", "input2.pdf", "input3.pdf"]), patch.object( + px, "run" + ) as mock_run, patch.object(pdftool, "pdf_merge"): + pdftool.main() + assert mock_run.called + + def test_main_merge_custom_output(self) -> None: + """main() should handle merge command with custom output.""" + with patch("sys.argv", ["pdftool", "m", "input.pdf", "--output", "custom.pdf"]), patch.object( + px, "run" + ) as mock_run, patch.object(pdftool, "pdf_merge"): + pdftool.main() + assert mock_run.called + + def test_main_split_file(self) -> None: + """main() should handle split command.""" + with patch("sys.argv", ["pdftool", "s", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_split" + ): + pdftool.main() + assert mock_run.called + + def test_main_split_custom_output_dir(self) -> None: + """main() should handle split command with custom output dir.""" + with patch("sys.argv", ["pdftool", "s", "input.pdf", "--output-dir", "split"]), patch.object( + px, "run" + ) as mock_run, patch.object(pdftool, "pdf_split"): + pdftool.main() + assert mock_run.called + + def test_main_compress_file(self) -> None: + """main() should handle compress command.""" + with patch("sys.argv", ["pdftool", "c", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_compress" + ): + pdftool.main() + assert mock_run.called + + def test_main_encrypt_file(self) -> None: + """main() should handle encrypt command.""" + with patch("sys.argv", ["pdftool", "e", "input.pdf", "--password", "pass"]), patch.object( + px, "run" + ) as mock_run, patch.object(pdftool, "pdf_encrypt"): + pdftool.main() + assert mock_run.called + + def test_main_decrypt_file(self) -> None: + """main() should handle decrypt command.""" + with patch("sys.argv", ["pdftool", "d", "input.pdf", "--password", "pass"]), patch.object( + px, "run" + ) as mock_run, patch.object(pdftool, "pdf_decrypt"): + pdftool.main() + assert mock_run.called + + def test_main_extract_text_file(self) -> None: + """main() should handle extract text command.""" + with patch("sys.argv", ["pdftool", "xt", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_extract_text" + ): + pdftool.main() + assert mock_run.called + + def test_main_extract_images_file(self) -> None: + """main() should handle extract images command.""" + with patch("sys.argv", ["pdftool", "xi", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_extract_images" + ): + pdftool.main() + assert mock_run.called + + def test_main_watermark_file(self) -> None: + """main() should handle watermark command.""" + with patch("sys.argv", ["pdftool", "w", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_add_watermark" + ): + pdftool.main() + assert mock_run.called + + def test_main_rotate_file(self) -> None: + """main() should handle rotate command.""" + with patch("sys.argv", ["pdftool", "r", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_rotate" + ): + pdftool.main() + assert mock_run.called + + def test_main_crop_file(self) -> None: + """main() should handle crop command.""" + with patch("sys.argv", ["pdftool", "crop", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_crop" + ): + pdftool.main() + assert mock_run.called + + def test_main_info_file(self) -> None: + """main() should handle info command.""" + with patch("sys.argv", ["pdftool", "i", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_info" + ): + pdftool.main() + assert mock_run.called + + def test_main_ocr_file(self) -> None: + """main() should handle ocr command.""" + with patch("sys.argv", ["pdftool", "ocr", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_ocr" + ): + pdftool.main() + assert mock_run.called + + def test_main_to_images_file(self) -> None: + """main() should handle to images command.""" + with patch("sys.argv", ["pdftool", "img", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_to_images" + ): + pdftool.main() + assert mock_run.called + + def test_main_repair_file(self) -> None: + """main() should handle repair command.""" + with patch("sys.argv", ["pdftool", "repair", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_repair" + ): + pdftool.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["pdftool"]), pytest.raises(SystemExit) as exc_info: + pdftool.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["pdftool", "m", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_merge" + ): + pdftool.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "pdf_merge" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["pdftool", "m", "input.pdf"]), patch.object(px, "run") as mock_run, patch.object( + pdftool, "pdf_merge" + ): + pdftool.main() + assert mock_run.call_args[1]["strategy"] == "thread" diff --git a/tests/cli/test_piptool.py b/tests/cli/test_piptool.py new file mode 100644 index 0000000..94fdb70 --- /dev/null +++ b/tests/cli/test_piptool.py @@ -0,0 +1,219 @@ +"""Tests for cli.piptool module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +import pyflowx as px +from pyflowx.cli import piptool + + +# ---------------------------------------------------------------------- # +# pip_uninstall +# ---------------------------------------------------------------------- # +class TestPipUninstall: + """Test pip_uninstall function.""" + + def test_pip_uninstall_single_package(self) -> None: + """Should uninstall single package.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_uninstall(["numpy"]) + # Should call pip uninstall + assert mock_run.called + + def test_pip_uninstall_multiple_packages(self) -> None: + """Should uninstall multiple packages.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_uninstall(["numpy", "pandas", "scipy"]) + # Should call pip uninstall + assert mock_run.called + + def test_pip_uninstall_with_wildcard(self) -> None: + """Should handle wildcard in package name.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_uninstall(["numpy*"]) + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# pip_reinstall +# ---------------------------------------------------------------------- # +class TestPipReinstall: + """Test pip_reinstall function.""" + + def test_pip_reinstall_online(self) -> None: + """Should reinstall packages online.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_reinstall(["numpy"], offline=False) + assert mock_run.called + + def test_pip_reinstall_offline(self) -> None: + """Should reinstall packages offline.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_reinstall(["numpy"], offline=True) + # Should call pip install with offline flags + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# pip_download +# ---------------------------------------------------------------------- # +class TestPipDownload: + """Test pip_download function.""" + + def test_pip_download_online(self) -> None: + """Should download packages online.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_download(["numpy"], offline=False) + assert mock_run.called + + def test_pip_download_offline(self) -> None: + """Should download packages offline.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + piptool.pip_download(["numpy"], offline=True) + # Should call pip download with offline flags + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# pip_freeze +# ---------------------------------------------------------------------- # +class TestPipFreeze: + """Test pip_freeze function.""" + + def test_pip_freeze_creates_file(self, tmp_path: Path) -> None: + """Should create requirements.txt file.""" + with patch("subprocess.run") as mock_run, patch.object(Path, "cwd", return_value=tmp_path): + mock_run.return_value = MagicMock(stdout="numpy==1.0.0\npandas==2.0.0\n", returncode=0) + piptool.pip_freeze() + # Should create requirements.txt + req_file = tmp_path / "requirements.txt" + # Note: The actual implementation might write to current directory + + def test_pip_freeze_calls_subprocess(self) -> None: + """Should call pip freeze.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="", returncode=0) + piptool.pip_freeze() + assert mock_run.called + call_args = mock_run.call_args[0][0] + assert "pip" in call_args + assert "freeze" in call_args + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_install_single_package(self) -> None: + """main() should handle install single package.""" + with patch("sys.argv", ["piptool", "i", "numpy"]), patch.object(px, "run") as mock_run: + piptool.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "pip" in spec.cmd + assert "install" in spec.cmd + assert "numpy" in spec.cmd + + def test_main_install_multiple_packages(self) -> None: + """main() should handle install multiple packages.""" + with patch("sys.argv", ["piptool", "i", "numpy", "pandas", "scipy"]), patch.object(px, "run") as mock_run: + piptool.main() + assert mock_run.called + + def test_main_uninstall_packages(self) -> None: + """main() should handle uninstall packages.""" + with patch("sys.argv", ["piptool", "u", "numpy"]), patch.object(px, "run") as mock_run, patch.object( + piptool, "pip_uninstall" + ): + piptool.main() + assert mock_run.called + + def test_main_reinstall_packages(self) -> None: + """main() should handle reinstall packages.""" + with patch("sys.argv", ["piptool", "r", "numpy"]), patch.object(px, "run") as mock_run, patch.object( + piptool, "pip_reinstall" + ): + piptool.main() + assert mock_run.called + + def test_main_reinstall_offline(self) -> None: + """main() should handle reinstall offline.""" + with patch("sys.argv", ["piptool", "r", "numpy", "--offline"]), patch.object( + px, "run" + ) as mock_run, patch.object(piptool, "pip_reinstall"): + piptool.main() + assert mock_run.called + + def test_main_download_packages(self) -> None: + """main() should handle download packages.""" + with patch("sys.argv", ["piptool", "d", "numpy"]), patch.object(px, "run") as mock_run, patch.object( + piptool, "pip_download" + ): + piptool.main() + assert mock_run.called + + def test_main_download_offline(self) -> None: + """main() should handle download offline.""" + with patch("sys.argv", ["piptool", "d", "numpy", "--offline"]), patch.object( + px, "run" + ) as mock_run, patch.object(piptool, "pip_download"): + piptool.main() + assert mock_run.called + + def test_main_upgrade_pip(self) -> None: + """main() should handle upgrade pip.""" + with patch("sys.argv", ["piptool", "up"]), patch.object(px, "run") as mock_run: + piptool.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert "python" in spec.cmd + assert "-m" in spec.cmd + assert "pip" in spec.cmd + assert "install" in spec.cmd + assert "--upgrade" in spec.cmd + + def test_main_freeze(self) -> None: + """main() should handle freeze.""" + with patch("sys.argv", ["piptool", "f"]), patch.object(px, "run") as mock_run, patch.object( + piptool, "pip_freeze" + ): + piptool.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["piptool"]), pytest.raises(SystemExit) as exc_info: + piptool.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_specs_with_verbose(self) -> None: + """main() should create TaskSpecs with verbose=True.""" + with patch("sys.argv", ["piptool", "i", "numpy"]), patch.object(px, "run") as mock_run: + piptool.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert spec.verbose is True + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["piptool", "i", "numpy"]), patch.object(px, "run") as mock_run: + piptool.main() + assert mock_run.call_args[1]["strategy"] == "thread" diff --git a/tests/cli/test_screenshot.py b/tests/cli/test_screenshot.py new file mode 100644 index 0000000..c54621b --- /dev/null +++ b/tests/cli/test_screenshot.py @@ -0,0 +1,147 @@ +"""Tests for cli.screenshot module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +import pyflowx as px +from pyflowx.cli import screenshot +from pyflowx.conditions import Constants + + +# ---------------------------------------------------------------------- # +# take_screenshot_full +# ---------------------------------------------------------------------- # +class TestTakeScreenshotFull: + """Test take_screenshot_full function.""" + + def test_take_screenshot_full_windows(self, tmp_path: Path) -> None: + """Should take full screenshot on Windows.""" + if Constants.IS_WINDOWS: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_full(filename="test.png") + assert mock_run.called + + def test_take_screenshot_full_linux(self, tmp_path: Path) -> None: + """Should take full screenshot on Linux.""" + with patch.object(Constants, "IS_WINDOWS", False), patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_full(filename="test.png") + assert mock_run.called + + def test_take_screenshot_full_with_custom_filename(self) -> None: + """Should take screenshot with custom filename.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_full(filename="custom.png") + assert mock_run.called + + def test_take_screenshot_full_default_filename(self) -> None: + """Should use default filename.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_full() + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# take_screenshot_area +# ---------------------------------------------------------------------- # +class TestTakeScreenshotArea: + """Test take_screenshot_area function.""" + + def test_take_screenshot_area_windows(self, tmp_path: Path) -> None: + """Should take area screenshot on Windows.""" + if Constants.IS_WINDOWS: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_area(filename="test.png") + assert mock_run.called + + def test_take_screenshot_area_linux(self, tmp_path: Path) -> None: + """Should take area screenshot on Linux.""" + with patch.object(Constants, "IS_WINDOWS", False), patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_area(filename="test.png") + assert mock_run.called + + def test_take_screenshot_area_with_custom_filename(self) -> None: + """Should take screenshot with custom filename.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_area(filename="custom.png") + assert mock_run.called + + def test_take_screenshot_area_default_filename(self) -> None: + """Should use default filename.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + screenshot.take_screenshot_area() + assert mock_run.called + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_full_default_filename(self) -> None: + """main() should handle full command with default filename.""" + with patch("sys.argv", ["screenshot", "full"]), patch.object(px, "run") as mock_run, patch.object( + screenshot, "take_screenshot_full" + ): + screenshot.main() + assert mock_run.called + + def test_main_full_custom_filename(self) -> None: + """main() should handle full command with custom filename.""" + with patch("sys.argv", ["screenshot", "full", "--filename", "custom.png"]), patch.object( + px, "run" + ) as mock_run, patch.object(screenshot, "take_screenshot_full"): + screenshot.main() + assert mock_run.called + + def test_main_area_default_filename(self) -> None: + """main() should handle area command with default filename.""" + with patch("sys.argv", ["screenshot", "area"]), patch.object(px, "run") as mock_run, patch.object( + screenshot, "take_screenshot_area" + ): + screenshot.main() + assert mock_run.called + + def test_main_area_custom_filename(self) -> None: + """main() should handle area command with custom filename.""" + with patch("sys.argv", ["screenshot", "area", "--filename", "custom.png"]), patch.object( + px, "run" + ) as mock_run, patch.object(screenshot, "take_screenshot_area"): + screenshot.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["screenshot"]), pytest.raises(SystemExit) as exc_info: + screenshot.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["screenshot", "full"]), patch.object(px, "run") as mock_run, patch.object( + screenshot, "take_screenshot_full" + ): + screenshot.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "screenshot_full" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["screenshot", "full"]), patch.object(px, "run") as mock_run, patch.object( + screenshot, "take_screenshot_full" + ): + screenshot.main() + assert mock_run.call_args[1]["strategy"] == "thread" diff --git a/tests/cli/test_sshcopyid.py b/tests/cli/test_sshcopyid.py new file mode 100644 index 0000000..5382fb7 --- /dev/null +++ b/tests/cli/test_sshcopyid.py @@ -0,0 +1,129 @@ +"""Tests for cli.sshcopyid module.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +import pyflowx as px +from pyflowx.cli import sshcopyid + + +# ---------------------------------------------------------------------- # +# ssh_copy_id +# ---------------------------------------------------------------------- # +class TestSshCopyId: + """Test ssh_copy_id function.""" + + def test_ssh_copy_id_success(self) -> None: + """ssh_copy_id should deploy SSH key successfully.""" + pytest.importorskip("paramiko") + with patch("paramiko.SSHClient") as mock_ssh_client, patch.object( + Path, "exists", return_value=True + ), patch.object(Path, "read_text", return_value="ssh-rsa AAAAB3..."): + mock_client = MagicMock() + mock_ssh_client.return_value = mock_client + mock_client.connect.return_value = None + mock_client.exec_command.return_value = (MagicMock(), MagicMock(), MagicMock()) + + result = sshcopyid.ssh_copy_id("localhost", "user", "password") + assert result is None # Function doesn't return anything + + def test_ssh_copy_id_with_custom_port(self) -> None: + """ssh_copy_id should handle custom port.""" + pytest.importorskip("paramiko") + with patch("paramiko.SSHClient") as mock_ssh_client, patch.object( + Path, "exists", return_value=True + ), patch.object(Path, "read_text", return_value="ssh-rsa AAAAB3..."): + mock_client = MagicMock() + mock_ssh_client.return_value = mock_client + mock_client.connect.return_value = None + mock_client.exec_command.return_value = (MagicMock(), MagicMock(), MagicMock()) + + result = sshcopyid.ssh_copy_id("localhost", "user", "password", port=2222) + # Verify that connect was called with custom port + mock_client.connect.assert_called_once() + call_args = mock_client.connect.call_args + assert call_args[1]["port"] == 2222 + + def test_ssh_copy_id_with_custom_keypath(self) -> None: + """ssh_copy_id should handle custom key path.""" + pytest.importorskip("paramiko") + with patch("paramiko.SSHClient") as mock_ssh_client, patch.object( + Path, "exists", return_value=True + ), patch.object(Path, "read_text", return_value="ssh-rsa AAAAB3..."): + mock_client = MagicMock() + mock_ssh_client.return_value = mock_client + mock_client.connect.return_value = None + mock_client.exec_command.return_value = (MagicMock(), MagicMock(), MagicMock()) + + result = sshcopyid.ssh_copy_id("localhost", "user", "password", keypath="/custom/key.pub") + # Verify that the custom keypath was used + assert result is None + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_with_required_args(self) -> None: + """main() should handle required arguments.""" + with patch("sys.argv", ["sshcopyid", "localhost", "user", "password"]), patch.object( + px, "run" + ) as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + assert isinstance(graph, px.Graph) + + def test_main_with_custom_port(self) -> None: + """main() should handle custom port argument.""" + with patch("sys.argv", ["sshcopyid", "localhost", "user", "password", "--port", "2222"]), patch.object( + px, "run" + ) as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + assert mock_run.called + + def test_main_with_custom_keypath(self) -> None: + """main() should handle custom keypath argument.""" + with patch( + "sys.argv", ["sshcopyid", "localhost", "user", "password", "--keypath", "/custom/key.pub"] + ), patch.object(px, "run") as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + assert mock_run.called + + def test_main_with_custom_timeout(self) -> None: + """main() should handle custom timeout argument.""" + with patch("sys.argv", ["sshcopyid", "localhost", "user", "password", "--timeout", "60"]), patch.object( + px, "run" + ) as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + assert mock_run.called + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["sshcopyid"]), pytest.raises(SystemExit) as exc_info: + sshcopyid.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_spec_with_correct_name(self) -> None: + """main() should create TaskSpec with correct name.""" + with patch("sys.argv", ["sshcopyid", "localhost", "user", "password"]), patch.object( + px, "run" + ) as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "ssh_deploy" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["sshcopyid", "localhost", "user", "password"]), patch.object( + px, "run" + ) as mock_run, patch.object(sshcopyid, "ssh_copy_id"): + sshcopyid.main() + assert mock_run.call_args[1]["strategy"] == "thread" diff --git a/tests/cli/test_taskkill.py b/tests/cli/test_taskkill.py new file mode 100644 index 0000000..16b2875 --- /dev/null +++ b/tests/cli/test_taskkill.py @@ -0,0 +1,107 @@ +"""Tests for cli.taskkill module.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +import pyflowx as px +from pyflowx.cli import taskkill +from pyflowx.conditions import Constants + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_with_single_process(self) -> None: + """main() should handle single process argument.""" + with patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + assert isinstance(graph, px.Graph) + + def test_main_with_multiple_processes(self) -> None: + """main() should handle multiple process arguments.""" + with patch("sys.argv", ["taskkill", "chrome.exe", "python.exe", "node.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + assert mock_run.called + graph = mock_run.call_args[0][0] + assert isinstance(graph, px.Graph) + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["taskkill"]), pytest.raises(SystemExit) as exc_info: + taskkill.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_specs_with_correct_names(self) -> None: + """main() should create TaskSpecs with correct names.""" + with patch("sys.argv", ["taskkill", "chrome.exe", "python.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + graph = mock_run.call_args[0][0] + task_names = list(graph.all_specs().keys()) + assert "kill_chrome.exe" in task_names + assert "kill_python.exe" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + assert mock_run.call_args[1]["strategy"] == "thread" + + def test_main_windows_command_format(self) -> None: + """main() should use Windows command format on Windows.""" + if Constants.IS_WINDOWS: + with patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + # Check that command includes Windows taskkill format + for spec in specs.values(): + assert spec.cmd[0] == "taskkill" + assert spec.cmd[1] == "/f" + assert spec.cmd[2] == "/im" + + def test_main_linux_command_format(self) -> None: + """main() should use Linux command format on Linux.""" + with patch.object(Constants, "IS_WINDOWS", False), \ + patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + # Check that command includes Linux pkill format + for spec in specs.values(): + assert spec.cmd[0] == "pkill" + assert spec.cmd[1] == "-f" + + def test_main_tasks_have_verbose_true(self) -> None: + """main() should create tasks with verbose=True.""" + with patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + for spec in specs.values(): + assert spec.verbose is True + + def test_main_adds_wildcard_to_process_name(self) -> None: + """main() should add wildcard to process name.""" + with patch("sys.argv", ["taskkill", "chrome.exe"]), \ + patch.object(px, "run") as mock_run: + taskkill.main() + graph = mock_run.call_args[0][0] + specs = graph.all_specs() + # Check that wildcard is added + for spec in specs.values(): + assert spec.cmd[-1].endswith("*") \ No newline at end of file diff --git a/tests/cli/test_which.py b/tests/cli/test_which.py new file mode 100644 index 0000000..44cd7be --- /dev/null +++ b/tests/cli/test_which.py @@ -0,0 +1,106 @@ +"""Tests for cli.which module.""" + +from __future__ import annotations + +import shutil +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +import pyflowx as px +from pyflowx.cli import which + + +# ---------------------------------------------------------------------- # +# which_command +# ---------------------------------------------------------------------- # +class TestWhichCommand: + """Test which_command function.""" + + def test_returns_path_when_command_found(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should return Path when command is found.""" + with patch.object(shutil, "which", return_value="/usr/bin/python"): + result = which.which_command("python") + assert result == Path("/usr/bin/python") + captured = capsys.readouterr() + assert "匹配路径" in captured.out + assert "/usr/bin/python" in captured.out + + def test_returns_none_when_command_not_found(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should return None when command is not found.""" + with patch.object(shutil, "which", return_value=None): + result = which.which_command("nonexistent_cmd") + assert result is None + captured = capsys.readouterr() + assert "未找到" in captured.out + assert "nonexistent_cmd" in captured.out + + def test_prints_match_path_on_success(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should print '匹配路径: - ' on success.""" + with patch.object(shutil, "which", return_value="C:\\Python\\python.exe"): + _ = which.which_command("python") + captured = capsys.readouterr() + assert "匹配路径: - C:\\Python\\python.exe" in captured.out + + def test_prints_not_found_on_failure(self, capsys: pytest.CaptureFixture[str]) -> None: + """Should print ': 未找到' on failure.""" + with patch.object(shutil, "which", return_value=None): + _ = which.which_command("missing") + captured = capsys.readouterr() + assert "missing: 未找到" in captured.out + + +# ---------------------------------------------------------------------- # +# main function +# ---------------------------------------------------------------------- # +class TestMain: + """Test main function.""" + + def test_main_with_single_command(self) -> None: + """main() should handle single command argument.""" + with patch("sys.argv", ["which", "python"]), \ + patch.object(shutil, "which", return_value="/usr/bin/python"), \ + patch.object(px, "run") as mock_run: + which.main() + # Should create a graph with one task + assert mock_run.called + graph = mock_run.call_args[0][0] + assert isinstance(graph, px.Graph) + + def test_main_with_multiple_commands(self) -> None: + """main() should handle multiple command arguments.""" + with patch("sys.argv", ["which", "python", "pip", "node"]), \ + patch.object(shutil, "which", return_value="/usr/bin/cmd"), \ + patch.object(px, "run") as mock_run: + which.main() + # Should create a graph with three tasks + assert mock_run.called + graph = mock_run.call_args[0][0] + assert isinstance(graph, px.Graph) + + def test_main_with_no_args_shows_help(self) -> None: + """main() with no args should show help and exit.""" + with patch("sys.argv", ["which"]), pytest.raises(SystemExit) as exc_info: + which.main() + assert exc_info.value.code == 2 + + def test_main_creates_task_specs_with_correct_names(self) -> None: + """main() should create TaskSpecs with correct names.""" + with patch("sys.argv", ["which", "git", "npm"]), \ + patch.object(shutil, "which", return_value="/usr/bin/cmd"), \ + patch.object(px, "run") as mock_run: + which.main() + graph = mock_run.call_args[0][0] + # Check that task names are correct + task_names = list(graph.all_specs().keys()) + assert "which_git" in task_names + assert "which_npm" in task_names + + def test_main_uses_thread_strategy(self) -> None: + """main() should use thread strategy.""" + with patch("sys.argv", ["which", "python"]), \ + patch.object(shutil, "which", return_value="/usr/bin/python"), \ + patch.object(px, "run") as mock_run: + which.main() + assert mock_run.call_args[1]["strategy"] == "thread" \ No newline at end of file