refactor: 重构所有CLI工具,替换内置Runner为原生argparse实现

This commit is contained in:
2026-06-22 07:51:29 +08:00
parent 50e74180a2
commit 0b97846d77
14 changed files with 586 additions and 503 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ pymake = "pyflowx.cli.pymake:main"
scrcap = "pyflowx.cli.screenshot:main" scrcap = "pyflowx.cli.screenshot:main"
sshcopy = "pyflowx.cli.sshcopyid:main" sshcopy = "pyflowx.cli.sshcopyid:main"
taskk = "pyflowx.cli.taskkill:main" taskk = "pyflowx.cli.taskkill:main"
whichcmd = "pyflowx.cli.which:main" wch = "pyflowx.cli.which:main"
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
+48 -35
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import ast import ast
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -227,26 +228,6 @@ def format_all(root_dir: Path) -> None:
print(f"格式化完成: {root_dir}") print(f"格式化完成: {root_dir}")
# ============================================================================
# TaskSpec 定义
# ============================================================================
# ruff format
ruff_format: px.TaskSpec = px.TaskSpec("ruff_format", cmd=["ruff", "format", "."])
# ruff check
ruff_check: px.TaskSpec = px.TaskSpec("ruff_check", cmd=["ruff", "check", "--fix", "--unsafe-fixes", "."])
# 自动添加 docstring
auto_docstring: px.TaskSpec = px.TaskSpec("auto_docstring", fn=lambda: auto_add_docstrings(Path()))
# 同步 pyproject.toml 配置
sync_config: px.TaskSpec = px.TaskSpec("sync_config", fn=lambda: sync_pyproject_config(Path()))
# 格式化所有文件
format_all_files: px.TaskSpec = px.TaskSpec("format_all", fn=lambda: format_all(Path()))
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -254,20 +235,52 @@ format_all_files: px.TaskSpec = px.TaskSpec("format_all", fn=lambda: format_all(
def main() -> None: def main() -> None:
"""自动格式化工具主函数.""" """自动格式化工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="AutoFmt - 自动格式化工具", description="AutoFmt - 自动格式化工具",
graphs={ usage="autofmt <command> [options]",
# ruff format
"fmt": px.Graph.from_specs([ruff_format]),
# ruff check
"lint": px.Graph.from_specs([ruff_check]),
# 自动添加 docstring
"doc": px.Graph.from_specs([auto_docstring]),
# 同步 pyproject.toml 配置
"sync": px.Graph.from_specs([sync_config]),
# 格式化所有文件
"all": px.Graph.from_specs([ruff_format, ruff_check]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# ruff format 命令
format_parser = subparsers.add_parser("fmt", help="使用 ruff 格式化代码")
format_parser.add_argument("--target", type=str, default=".", help="目标路径")
# ruff check 命令
lint_parser = subparsers.add_parser("lint", help="使用 ruff 检查代码")
lint_parser.add_argument("--target", type=str, default=".", help="目标路径")
lint_parser.add_argument("--fix", action="store_true", help="自动修复")
# 自动添加 docstring 命令
doc_parser = subparsers.add_parser("doc", help="自动添加 docstring")
doc_parser.add_argument("--root-dir", type=str, default=".", help="根目录")
# 同步配置命令
sync_parser = subparsers.add_parser("sync", help="同步 pyproject.toml 配置")
sync_parser.add_argument("--root-dir", type=str, default=".", help="根目录")
args = parser.parse_args()
if args.command == "fmt":
graph = px.Graph.from_specs([
px.TaskSpec("ruff_format", cmd=["ruff", "format", args.target], verbose=True)
])
elif args.command == "lint":
cmd = ["ruff", "check", args.target]
if args.fix:
cmd.extend(["--fix", "--unsafe-fixes"])
graph = px.Graph.from_specs([
px.TaskSpec("ruff_check", cmd=cmd, verbose=True)
])
elif args.command == "doc":
graph = px.Graph.from_specs([
px.TaskSpec("auto_docstring", fn=auto_add_docstrings, args=(Path(args.root_dir),), verbose=True)
])
elif args.command == "sync":
graph = px.Graph.from_specs([
px.TaskSpec("sync_config", fn=sync_pyproject_config, args=(Path(args.root_dir),), verbose=True)
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+21 -17
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import os import os
from pathlib import Path from pathlib import Path
@@ -90,14 +91,6 @@ def set_pip_mirror(mirror: str = "tsinghua", token: str | None = None) -> None:
print(f"已设置 pip 镜像源: {mirror} ({index_url})") print(f"已设置 pip 镜像源: {mirror} ({index_url})")
# ============================================================================
# TaskSpec 定义
# ============================================================================
envpy_tsinghua: px.TaskSpec = px.TaskSpec("envpy_tsinghua", fn=lambda: set_pip_mirror("tsinghua"))
envpy_aliyun: px.TaskSpec = px.TaskSpec("envpy_aliyun", fn=lambda: set_pip_mirror("aliyun"))
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -105,14 +98,25 @@ envpy_aliyun: px.TaskSpec = px.TaskSpec("envpy_aliyun", fn=lambda: set_pip_mirro
def main() -> None: def main() -> None:
"""Python 环境配置工具主函数.""" """Python 环境配置工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="EnvPy - Python 环境配置工具", description="EnvPy - Python 环境配置工具",
graphs={ usage="envpy <command> [options]",
# 设置清华镜像源
"t": px.Graph.from_specs([envpy_tsinghua]),
# 设置阿里云镜像源
"a": px.Graph.from_specs([envpy_aliyun]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 设置镜像源命令
mirror_parser = subparsers.add_parser("mirror", help="设置 pip 镜像源")
mirror_parser.add_argument("name", choices=["tsinghua", "aliyun"], help="镜像源名称")
mirror_parser.add_argument("--token", type=str, help="PyPI token for publishing")
args = parser.parse_args()
if args.command == "mirror":
graph = px.Graph.from_specs([
px.TaskSpec("set_pip_mirror", fn=set_pip_mirror, args=(args.name,), kwargs={"token": args.token})
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+49 -34
View File
@@ -6,9 +6,11 @@
from __future__ import annotations from __future__ import annotations
import argparse
import os import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Literal, get_args
import pyflowx as px import pyflowx as px
@@ -34,8 +36,11 @@ RUSTUP_MIRRORS: dict[str, dict[str, str]] = {
}, },
} }
DEFAULT_PYTHON_VERSION: str = "nightly" UsableRustVersion = Literal["stable", "nightly", "beta"]
DEFAULT_MIRROR: str = "aliyun" UsableMirror = Literal["aliyun", "ustc", "tsinghua"]
DEFAULT_RUST_VERSION: str = "stable"
DEFAULT_MIRROR: UsableMirror = "tsinghua"
# ============================================================================ # ============================================================================
@@ -43,7 +48,7 @@ DEFAULT_MIRROR: str = "aliyun"
# ============================================================================ # ============================================================================
def set_rust_mirror(mirror: str = "aliyun") -> None: def set_rust_mirror(mirror: UsableMirror = DEFAULT_MIRROR) -> None:
"""设置 Rust 镜像源. """设置 Rust 镜像源.
Parameters Parameters
@@ -51,7 +56,7 @@ def set_rust_mirror(mirror: str = "aliyun") -> None:
mirror : str mirror : str
镜像源名称: aliyun, ustc, tsinghua 镜像源名称: aliyun, ustc, tsinghua
""" """
mirror_dict = RUSTUP_MIRRORS.get(mirror, RUSTUP_MIRRORS["aliyun"]) mirror_dict = RUSTUP_MIRRORS.get(mirror, RUSTUP_MIRRORS[DEFAULT_MIRROR])
server = mirror_dict["RUSTUP_DIST_SERVER"] server = mirror_dict["RUSTUP_DIST_SERVER"]
update_root = mirror_dict["RUSTUP_UPDATE_ROOT"] update_root = mirror_dict["RUSTUP_UPDATE_ROOT"]
toml_registry = mirror_dict["TOML_REGISTRY"] toml_registry = mirror_dict["TOML_REGISTRY"]
@@ -79,7 +84,7 @@ index = "sparse+{toml_registry}"
print(f"已设置 Rust 镜像源: {mirror}") print(f"已设置 Rust 镜像源: {mirror}")
def install_rust(version: str = "nightly") -> None: def install_rust(version: UsableRustVersion = DEFAULT_RUST_VERSION) -> None:
"""安装 Rust 工具链. """安装 Rust 工具链.
Parameters Parameters
@@ -95,20 +100,6 @@ def install_rust(version: str = "nightly") -> None:
raise raise
# ============================================================================
# TaskSpec 定义
# ============================================================================
envrs_aliyun: px.TaskSpec = px.TaskSpec("envrs_aliyun", fn=lambda: set_rust_mirror("aliyun"))
envrs_ustc: px.TaskSpec = px.TaskSpec("envrs_ustc", fn=lambda: set_rust_mirror("ustc"))
envrs_tsinghua: px.TaskSpec = px.TaskSpec("envrs_tsinghua", fn=lambda: set_rust_mirror("tsinghua"))
rust_install_stable: px.TaskSpec = px.TaskSpec("rust_install_stable", cmd=["rustup", "toolchain", "install", "stable"])
rust_install_nightly: px.TaskSpec = px.TaskSpec(
"rust_install_nightly", cmd=["rustup", "toolchain", "install", "nightly"]
)
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -116,20 +107,44 @@ rust_install_nightly: px.TaskSpec = px.TaskSpec(
def main() -> None: def main() -> None:
"""Rust 环境配置工具主函数.""" """Rust 环境配置工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="EnvRs - Rust 环境配置工具", description="EnvRs - Rust 环境配置工具",
graphs={ usage="envrs <command> [options]",
# 设置阿里云镜像源
"a": px.Graph.from_specs([envrs_aliyun]),
# 设置中科大镜像源
"u": px.Graph.from_specs([envrs_ustc]),
# 设置清华镜像源
"t": px.Graph.from_specs([envrs_tsinghua]),
# 安装 stable 版本
"s": px.Graph.from_specs([rust_install_stable]),
# 安装 nightly 版本
"n": px.Graph.from_specs([rust_install_nightly]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 设置镜像源命令
mirror_parser = subparsers.add_parser("mirror", help="设置 Rust 镜像源")
mirror_parser.add_argument(
"name",
nargs="?",
default=DEFAULT_MIRROR,
choices=get_args(UsableMirror),
help=f"镜像源名称 ({get_args(UsableMirror)})",
)
# 安装 Rust 命令
install_parser = subparsers.add_parser("install", help="安装 Rust 工具链")
install_parser.add_argument(
"version",
nargs="?",
default=DEFAULT_RUST_VERSION,
choices=get_args(UsableRustVersion),
help=f"Rust 版本 ({get_args(UsableRustVersion)})",
)
args = parser.parse_args()
if args.command == "mirror":
graph = px.Graph.from_specs([
px.TaskSpec("set_rust_mirror", fn=set_rust_mirror, args=(args.name,), verbose=True)
])
elif args.command == "install":
graph = px.Graph.from_specs([
px.TaskSpec("install_rust", cmd=["rustup", "toolchain", "install", args.version], verbose=True)
])
else:
parser.print_help()
return
px.run(graph, strategy="thread", verbose=True)
+38 -17
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import re import re
import time import time
from pathlib import Path from pathlib import Path
@@ -88,14 +89,6 @@ def process_files_date(targets: list[Path], clear: bool = False) -> None:
process_file_date(target, clear) process_file_date(target, clear)
# ============================================================================
# TaskSpec 定义
# ============================================================================
filedate_clear: px.TaskSpec = px.TaskSpec("filedate_clear", fn=lambda: process_files_date([], clear=True))
filedate_add: px.TaskSpec = px.TaskSpec("filedate_add", fn=lambda: process_files_date([], clear=False))
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -103,14 +96,42 @@ filedate_add: px.TaskSpec = px.TaskSpec("filedate_add", fn=lambda: process_files
def main() -> None: def main() -> None:
"""文件日期处理工具主函数.""" """文件日期处理工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="FileDate - 文件日期处理工具", description="FileDate - 文件日期处理工具",
graphs={ usage="filedate <command> [options]",
# 清除日期前缀
"c": px.Graph.from_specs([filedate_clear]),
# 添加日期前缀
"a": px.Graph.from_specs([filedate_add]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 添加日期前缀命令
add_parser = subparsers.add_parser("add", help="添加日期前缀")
add_parser.add_argument("files", nargs="+", help="文件路径")
# 清除日期前缀命令
clear_parser = subparsers.add_parser("clear", help="清除日期前缀")
clear_parser.add_argument("files", nargs="+", help="文件路径")
args = parser.parse_args()
if args.command == "add":
graph = px.Graph.from_specs([
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": False},
)
])
elif args.command == "clear":
graph = px.Graph.from_specs([
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": True},
)
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+21 -26
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
from pathlib import Path from pathlib import Path
import pyflowx as px import pyflowx as px
@@ -104,17 +105,6 @@ def process_files_level(targets: list[Path], level: int = 0) -> None:
process_file_level(target, level) process_file_level(target, level)
# ============================================================================
# TaskSpec 定义
# ============================================================================
filelevel_clear: px.TaskSpec = px.TaskSpec("filelevel_clear", fn=lambda: process_files_level([], level=0))
filelevel_pub: px.TaskSpec = px.TaskSpec("filelevel_pub", fn=lambda: process_files_level([], level=1))
filelevel_int: px.TaskSpec = px.TaskSpec("filelevel_int", fn=lambda: process_files_level([], level=2))
filelevel_con: px.TaskSpec = px.TaskSpec("filelevel_con", fn=lambda: process_files_level([], level=3))
filelevel_cla: px.TaskSpec = px.TaskSpec("filelevel_cla", fn=lambda: process_files_level([], level=4))
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -122,20 +112,25 @@ filelevel_cla: px.TaskSpec = px.TaskSpec("filelevel_cla", fn=lambda: process_fil
def main() -> None: def main() -> None:
"""文件等级重命名工具主函数.""" """文件等级重命名工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="FileLevel - 文件等级重命名工具", description="FileLevel - 文件等级重命名工具",
graphs={ usage="filelevel <command> [options]",
# 清除等级标记
"c": px.Graph.from_specs([filelevel_clear]),
# 设置公开等级 (PUB)
"pub": px.Graph.from_specs([filelevel_pub]),
# 设置内部等级 (INT)
"int": px.Graph.from_specs([filelevel_int]),
# 设置机密等级 (CON)
"con": px.Graph.from_specs([filelevel_con]),
# 设置绝密等级 (CLA)
"cla": px.Graph.from_specs([filelevel_cla]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 设置等级命令
level_parser = subparsers.add_parser("set", help="设置文件等级")
level_parser.add_argument("files", nargs="+", help="文件路径")
level_parser.add_argument("--level", type=int, choices=[0, 1, 2, 3, 4], required=True, help="文件等级 (0-4)")
args = parser.parse_args()
if args.command == "set":
graph = px.Graph.from_specs([
px.TaskSpec("process_files_level", fn=process_files_level, args=([Path(f) for f in args.files], args.level))
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+35 -28
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -128,23 +129,6 @@ def check_ls_dyna_status() -> None:
print(f"检查进程状态失败: {e}") print(f"检查进程状态失败: {e}")
# ============================================================================
# TaskSpec 定义
# ============================================================================
lscalc_default: px.TaskSpec = px.TaskSpec(
"lscalc_default",
fn=lambda: run_ls_dyna(DEFAULT_INPUT_FILE, DEFAULT_NCPU),
)
lscalc_mpi: px.TaskSpec = px.TaskSpec(
"lscalc_mpi",
fn=lambda: run_ls_dyna_mpi(DEFAULT_INPUT_FILE, DEFAULT_NCPU),
)
lscalc_status: px.TaskSpec = px.TaskSpec("lscalc_status", fn=check_ls_dyna_status)
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -152,16 +136,39 @@ lscalc_status: px.TaskSpec = px.TaskSpec("lscalc_status", fn=check_ls_dyna_statu
def main() -> None: def main() -> None:
"""LS-DYNA 计算工具主函数.""" """LS-DYNA 计算工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="LSCalc - LS-DYNA 计算工具", description="LSCalc - LS-DYNA 计算工具",
graphs={ usage="lscalc <command> [options]",
# 运行 LS-DYNA 计算
"r": px.Graph.from_specs([lscalc_default]),
# 运行 LS-DYNA MPI 计算
"mpi": px.Graph.from_specs([lscalc_mpi]),
# 检查进程状态
"s": px.Graph.from_specs([lscalc_status]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 运行计算命令
run_parser = subparsers.add_parser("run", help="运行 LS-DYNA 计算")
run_parser.add_argument("input_file", help="输入文件路径")
run_parser.add_argument("--ncpu", type=int, default=DEFAULT_NCPU, help="CPU 核心数")
# 运行 MPI 计算命令
mpi_parser = subparsers.add_parser("mpi", help="运行 LS-DYNA MPI 计算")
mpi_parser.add_argument("input_file", help="输入文件路径")
mpi_parser.add_argument("--ncpu", type=int, default=DEFAULT_NCPU, help="CPU 核心数")
# 检查进程状态命令
subparsers.add_parser("status", help="检查 LS-DYNA 进程状态")
args = parser.parse_args()
if args.command == "run":
graph = px.Graph.from_specs([
px.TaskSpec("run_ls_dyna", fn=run_ls_dyna, args=(args.input_file,), kwargs={"ncpu": args.ncpu})
])
elif args.command == "mpi":
graph = px.Graph.from_specs([
px.TaskSpec("run_ls_dyna_mpi", fn=run_ls_dyna_mpi, args=(args.input_file,), kwargs={"ncpu": args.ncpu})
])
elif args.command == "status":
graph = px.Graph.from_specs([px.TaskSpec("check_ls_dyna_status", fn=check_ls_dyna_status)])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+82 -48
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import shutil import shutil
import subprocess import subprocess
import zipfile import zipfile
@@ -246,31 +247,6 @@ def clean_build_dir(build_dir: Path) -> None:
print(f"目录不存在: {build_dir}") print(f"目录不存在: {build_dir}")
# ============================================================================
# TaskSpec 定义
# ============================================================================
# 源码打包
pack_source_default: px.TaskSpec = px.TaskSpec("pack_source", fn=lambda: pack_source(Path(), Path(DEFAULT_BUILD_DIR)))
# 依赖打包
pack_deps_default: px.TaskSpec = px.TaskSpec("pack_deps", fn=lambda: pack_dependencies(Path(DEFAULT_LIB_DIR), []))
# Wheel 打包
pack_wheel_default: px.TaskSpec = px.TaskSpec("pack_wheel", fn=lambda: pack_wheel(Path(), Path(DEFAULT_DIST_DIR)))
# 嵌入式 Python 安装
install_embed_default: px.TaskSpec = px.TaskSpec(
"install_embed", fn=lambda: install_embed_python("3.10", Path("python"))
)
# ZIP 打包
create_zip_default: px.TaskSpec = px.TaskSpec("create_zip", fn=lambda: create_zip_package(Path(), Path("package.zip")))
# 清理构建目录
clean_build: px.TaskSpec = px.TaskSpec("clean_build", fn=lambda: clean_build_dir(Path(DEFAULT_BUILD_DIR)))
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -278,28 +254,86 @@ clean_build: px.TaskSpec = px.TaskSpec("clean_build", fn=lambda: clean_build_dir
def main() -> None: def main() -> None:
"""Python 打包工具主函数.""" """Python 打包工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="PackTool - Python 打包工具", description="PackTool - Python 打包工具",
graphs={ usage="packtool <command> [options]",
# 源码打包
"src": px.Graph.from_specs([pack_source_default]),
# 依赖打包
"deps": px.Graph.from_specs([pack_deps_default]),
# Wheel 打包
"wheel": px.Graph.from_specs([pack_wheel_default]),
# 嵌入式 Python 安装
"embed": px.Graph.from_specs([install_embed_default]),
# ZIP 打包
"zip": px.Graph.from_specs([create_zip_default]),
# 清理构建目录
"clean": px.Graph.from_specs([clean_build]),
# 完整打包流程
"all": px.Graph.from_specs([
pack_source_default,
pack_deps_default,
pack_wheel_default,
]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 源码打包命令
src_parser = subparsers.add_parser("src", help="打包项目源码")
src_parser.add_argument("--project-dir", type=str, default=".", help="项目目录")
src_parser.add_argument("--output-dir", type=str, default=DEFAULT_BUILD_DIR, help="输出目录")
# 依赖打包命令
deps_parser = subparsers.add_parser("deps", help="打包项目依赖")
deps_parser.add_argument("--lib-dir", type=str, default=DEFAULT_LIB_DIR, help="依赖库目录")
deps_parser.add_argument("dependencies", nargs="*", help="依赖列表")
# Wheel 打包命令
wheel_parser = subparsers.add_parser("wheel", help="打包项目为 wheel 文件")
wheel_parser.add_argument("--project-dir", type=str, default=".", help="项目目录")
wheel_parser.add_argument("--output-dir", type=str, default=DEFAULT_DIST_DIR, help="输出目录")
# 嵌入式 Python 安装命令
embed_parser = subparsers.add_parser("embed", help="安装嵌入式 Python")
embed_parser.add_argument("--version", type=str, default="3.10", help="Python 版本")
embed_parser.add_argument("--output-dir", type=str, default="python", help="输出目录")
# ZIP 打包命令
zip_parser = subparsers.add_parser("zip", help="创建 ZIP 打包文件")
zip_parser.add_argument("--source-dir", type=str, default=".", help="源目录")
zip_parser.add_argument("--output-file", type=str, default="package.zip", help="输出文件")
# 清理命令
subparsers.add_parser("clean", help="清理构建目录")
args = parser.parse_args()
if args.command == "src":
graph = px.Graph.from_specs([
px.TaskSpec(
"pack_source",
fn=pack_source,
args=(Path(args.project_dir), Path(args.output_dir)),
)
])
elif args.command == "deps":
graph = px.Graph.from_specs([
px.TaskSpec(
"pack_deps",
fn=pack_dependencies,
args=(Path(args.lib_dir), args.dependencies),
)
])
elif args.command == "wheel":
graph = px.Graph.from_specs([
px.TaskSpec(
"pack_wheel",
fn=pack_wheel,
args=(Path(args.project_dir), Path(args.output_dir)),
)
])
elif args.command == "embed":
graph = px.Graph.from_specs([
px.TaskSpec(
"install_embed",
fn=install_embed_python,
args=(args.version, Path(args.output_dir)),
)
])
elif args.command == "zip":
graph = px.Graph.from_specs([
px.TaskSpec(
"create_zip",
fn=create_zip_package,
args=(Path(args.source_dir), Path(args.output_file)),
)
])
elif args.command == "clean":
graph = px.Graph.from_specs([px.TaskSpec("clean_build", fn=clean_build_dir, args=(Path(DEFAULT_BUILD_DIR),))])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+165 -107
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
from pathlib import Path from pathlib import Path
import pyflowx as px import pyflowx as px
@@ -340,78 +341,6 @@ def pdf_repair(input_path: Path, output_path: Path) -> None:
print(f"修复完成: {output_path}") print(f"修复完成: {output_path}")
# ============================================================================
# TaskSpec 定义
# ============================================================================
# PDF 合并
pdf_merge_default: px.TaskSpec = px.TaskSpec("pdf_merge", fn=lambda: pdf_merge([], Path("merged.pdf")))
# PDF 拆分
pdf_split_default: px.TaskSpec = px.TaskSpec("pdf_split", fn=lambda: pdf_split(Path("input.pdf"), Path("split")))
# PDF 压缩
pdf_compress_default: px.TaskSpec = px.TaskSpec(
"pdf_compress", fn=lambda: pdf_compress(Path("input.pdf"), Path("compressed.pdf"))
)
# PDF 加密
pdf_encrypt_default: px.TaskSpec = px.TaskSpec(
"pdf_encrypt", fn=lambda: pdf_encrypt(Path("input.pdf"), Path("encrypted.pdf"), "password")
)
# PDF 解密
pdf_decrypt_default: px.TaskSpec = px.TaskSpec(
"pdf_decrypt", fn=lambda: pdf_decrypt(Path("input.pdf"), Path("decrypted.pdf"), "password")
)
# PDF 提取文本
pdf_extract_text_default: px.TaskSpec = px.TaskSpec(
"pdf_extract_text", fn=lambda: pdf_extract_text(Path("input.pdf"), Path("output.txt"))
)
# PDF 提取图片
pdf_extract_images_default: px.TaskSpec = px.TaskSpec(
"pdf_extract_images", fn=lambda: pdf_extract_images(Path("input.pdf"), Path("images"))
)
# PDF 添加水印
pdf_watermark_default: px.TaskSpec = px.TaskSpec(
"pdf_watermark", fn=lambda: pdf_add_watermark(Path("input.pdf"), Path("watermarked.pdf"))
)
# PDF 旋转
pdf_rotate_default: px.TaskSpec = px.TaskSpec(
"pdf_rotate", fn=lambda: pdf_rotate(Path("input.pdf"), Path("rotated.pdf"), 90)
)
# PDF 裁剪
pdf_crop_default: px.TaskSpec = px.TaskSpec(
"pdf_crop", fn=lambda: pdf_crop(Path("input.pdf"), Path("cropped.pdf"), (10, 10, 10, 10))
)
# PDF 信息
pdf_info_default: px.TaskSpec = px.TaskSpec("pdf_info", fn=lambda: pdf_info(Path("input.pdf")))
# PDF OCR
pdf_ocr_default: px.TaskSpec = px.TaskSpec("pdf_ocr", fn=lambda: pdf_ocr(Path("input.pdf"), Path("ocr.pdf")))
# PDF 重排
pdf_reorder_default: px.TaskSpec = px.TaskSpec(
"pdf_reorder", fn=lambda: pdf_reorder(Path("input.pdf"), Path("reordered.pdf"), [])
)
# PDF 转图片
pdf_to_images_default: px.TaskSpec = px.TaskSpec(
"pdf_to_images", fn=lambda: pdf_to_images(Path("input.pdf"), Path("images"))
)
# PDF 修复
pdf_repair_default: px.TaskSpec = px.TaskSpec(
"pdf_repair", fn=lambda: pdf_repair(Path("input.pdf"), Path("repaired.pdf"))
)
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -419,40 +348,169 @@ pdf_repair_default: px.TaskSpec = px.TaskSpec(
def main() -> None: def main() -> None:
"""PDF 工具主函数.""" """PDF 工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="PDFTool - PDF 文件工具集", description="PDFTool - PDF 文件工具集",
graphs={ usage="pdftool <command> [options]",
# 合并 PDF
"m": px.Graph.from_specs([pdf_merge_default]),
# 拆分 PDF
"s": px.Graph.from_specs([pdf_split_default]),
# 压缩 PDF
"c": px.Graph.from_specs([pdf_compress_default]),
# 加密 PDF
"e": px.Graph.from_specs([pdf_encrypt_default]),
# 解密 PDF
"d": px.Graph.from_specs([pdf_decrypt_default]),
# 提取文本
"xt": px.Graph.from_specs([pdf_extract_text_default]),
# 提取图片
"xi": px.Graph.from_specs([pdf_extract_images_default]),
# 添加水印
"w": px.Graph.from_specs([pdf_watermark_default]),
# 旋转 PDF
"r": px.Graph.from_specs([pdf_rotate_default]),
# 裁剪 PDF
"crop": px.Graph.from_specs([pdf_crop_default]),
# 显示信息
"i": px.Graph.from_specs([pdf_info_default]),
# OCR 识别
"ocr": px.Graph.from_specs([pdf_ocr_default]),
# 重排页面
"order": px.Graph.from_specs([pdf_reorder_default]),
# 转换图片
"img": px.Graph.from_specs([pdf_to_images_default]),
# 修复 PDF
"repair": px.Graph.from_specs([pdf_repair_default]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 合并 PDF 命令
merge_parser = subparsers.add_parser("m", help="合并 PDF 文件")
merge_parser.add_argument("inputs", nargs="+", help="输入 PDF 文件路径")
merge_parser.add_argument("--output", type=str, default="merged.pdf", help="输出文件路径")
# 拆分 PDF 命令
split_parser = subparsers.add_parser("s", help="拆分 PDF 文件为单页")
split_parser.add_argument("input", help="输入 PDF 文件路径")
split_parser.add_argument("--output-dir", type=str, default="split", help="输出目录")
# 压缩 PDF 命令
compress_parser = subparsers.add_parser("c", help="压缩 PDF 文件")
compress_parser.add_argument("input", help="输入 PDF 文件路径")
compress_parser.add_argument("--output", type=str, default="compressed.pdf", help="输出文件路径")
# 加密 PDF 命令
encrypt_parser = subparsers.add_parser("e", help="加密 PDF 文件")
encrypt_parser.add_argument("input", help="输入 PDF 文件路径")
encrypt_parser.add_argument("--output", type=str, default="encrypted.pdf", help="输出文件路径")
encrypt_parser.add_argument("--password", type=str, required=True, help="密码")
# 解密 PDF 命令
decrypt_parser = subparsers.add_parser("d", help="解密 PDF 文件")
decrypt_parser.add_argument("input", help="输入 PDF 文件路径")
decrypt_parser.add_argument("--output", type=str, default="decrypted.pdf", help="输出文件路径")
decrypt_parser.add_argument("--password", type=str, required=True, help="密码")
# 提取文本命令
extract_text_parser = subparsers.add_parser("xt", help="提取 PDF 文本")
extract_text_parser.add_argument("input", help="输入 PDF 文件路径")
extract_text_parser.add_argument("--output", type=str, default="output.txt", help="输出文件路径")
# 提取图片命令
extract_images_parser = subparsers.add_parser("xi", help="提取 PDF 图片")
extract_images_parser.add_argument("input", help="输入 PDF 文件路径")
extract_images_parser.add_argument("--output-dir", type=str, default="images", help="输出目录")
# 添加水印命令
watermark_parser = subparsers.add_parser("w", help="添加 PDF 水印")
watermark_parser.add_argument("input", help="输入 PDF 文件路径")
watermark_parser.add_argument("--output", type=str, default="watermarked.pdf", help="输出文件路径")
watermark_parser.add_argument("--text", type=str, default="CONFIDENTIAL", help="水印文本")
# 旋转 PDF 命令
rotate_parser = subparsers.add_parser("r", help="旋转 PDF 页面")
rotate_parser.add_argument("input", help="输入 PDF 文件路径")
rotate_parser.add_argument("--output", type=str, default="rotated.pdf", help="输出文件路径")
rotate_parser.add_argument("--rotation", type=int, default=90, help="旋转角度 (90, 180, 270)")
# 裁剪 PDF 命令
crop_parser = subparsers.add_parser("crop", help="裁剪 PDF 页面")
crop_parser.add_argument("input", help="输入 PDF 文件路径")
crop_parser.add_argument("--output", type=str, default="cropped.pdf", help="输出文件路径")
crop_parser.add_argument("--left", type=int, default=10, help="左边裁剪")
crop_parser.add_argument("--top", type=int, default=10, help="顶部裁剪")
crop_parser.add_argument("--right", type=int, default=10, help="右边裁剪")
crop_parser.add_argument("--bottom", type=int, default=10, help="底部裁剪")
# 显示信息命令
info_parser = subparsers.add_parser("i", help="显示 PDF 信息")
info_parser.add_argument("input", help="输入 PDF 文件路径")
# OCR 识别命令
ocr_parser = subparsers.add_parser("ocr", help="PDF OCR 识别")
ocr_parser.add_argument("input", help="输入 PDF 文件路径")
ocr_parser.add_argument("--output", type=str, default="ocr.pdf", help="输出文件路径")
ocr_parser.add_argument("--lang", type=str, default="chi_sim+eng", help="OCR 语言")
# 转换图片命令
to_images_parser = subparsers.add_parser("img", help="PDF 转图片")
to_images_parser.add_argument("input", help="输入 PDF 文件路径")
to_images_parser.add_argument("--output-dir", type=str, default="images", help="输出目录")
to_images_parser.add_argument("--dpi", type=int, default=300, help="图片 DPI")
# 修复 PDF 命令
repair_parser = subparsers.add_parser("repair", help="修复 PDF 文件")
repair_parser.add_argument("input", help="输入 PDF 文件路径")
repair_parser.add_argument("--output", type=str, default="repaired.pdf", help="输出文件路径")
args = parser.parse_args()
if args.command == "m":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_merge", fn=pdf_merge, args=([Path(p) for p in args.inputs], Path(args.output)))
])
elif args.command == "s":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_split", fn=pdf_split, args=(Path(args.input), Path(args.output_dir)))
])
elif args.command == "c":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_compress", fn=pdf_compress, args=(Path(args.input), Path(args.output)))
])
elif args.command == "e":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_encrypt", fn=pdf_encrypt, args=(Path(args.input), Path(args.output), args.password))
])
elif args.command == "d":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_decrypt", fn=pdf_decrypt, args=(Path(args.input), Path(args.output), args.password))
])
elif args.command == "xt":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_extract_text", fn=pdf_extract_text, args=(Path(args.input), Path(args.output)))
])
elif args.command == "xi":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_extract_images", fn=pdf_extract_images, args=(Path(args.input), Path(args.output_dir)))
])
elif args.command == "w":
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_watermark",
fn=pdf_add_watermark,
args=(Path(args.input), Path(args.output)),
kwargs={"text": args.text},
)
])
elif args.command == "r":
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_rotate",
fn=pdf_rotate,
args=(Path(args.input), Path(args.output)),
kwargs={"rotation": args.rotation},
)
])
elif args.command == "crop":
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_crop",
fn=pdf_crop,
args=(Path(args.input), Path(args.output)),
kwargs={"margins": (args.left, args.top, args.right, args.bottom)},
)
])
elif args.command == "i":
graph = px.Graph.from_specs([px.TaskSpec("pdf_info", fn=pdf_info, args=(Path(args.input),))])
elif args.command == "ocr":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_ocr", fn=pdf_ocr, args=(Path(args.input), Path(args.output)), kwargs={"lang": args.lang})
])
elif args.command == "img":
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_to_images",
fn=pdf_to_images,
args=(Path(args.input), Path(args.output_dir)),
kwargs={"dpi": args.dpi},
)
])
elif args.command == "repair":
graph = px.Graph.from_specs([
px.TaskSpec("pdf_repair", fn=pdf_repair, args=(Path(args.input), Path(args.output)))
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+64 -41
View File
@@ -6,6 +6,7 @@
from __future__ import annotations from __future__ import annotations
import argparse
import fnmatch import fnmatch
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -20,12 +21,10 @@ PACKAGE_DIR = "packages"
REQUIREMENTS_FILE = "requirements.txt" REQUIREMENTS_FILE = "requirements.txt"
# 受保护的包名集合 # 受保护的包名集合
_PROTECTED_PACKAGES: frozenset[str] = frozenset( _PROTECTED_PACKAGES: frozenset[str] = frozenset({
{ "pyflowx",
"pyflowx", "bitool",
"bitool", })
}
)
# ============================================================================ # ============================================================================
@@ -118,14 +117,6 @@ def pip_freeze() -> None:
Path(REQUIREMENTS_FILE).write_text(result.stdout) Path(REQUIREMENTS_FILE).write_text(result.stdout)
# ============================================================================
# TaskSpec 定义
# ============================================================================
pip_install: px.TaskSpec = px.TaskSpec("pip_install", cmd=["pip", "install", "."])
pip_upgrade: px.TaskSpec = px.TaskSpec("pip_upgrade", cmd=["python", "-m", "pip", "install", "--upgrade", "pip"])
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -133,32 +124,64 @@ pip_upgrade: px.TaskSpec = px.TaskSpec("pip_upgrade", cmd=["python", "-m", "pip"
def main() -> None: def main() -> None:
"""pip 工具主函数.""" """pip 工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="PipTool - pip 包管理工具", description="PipTool - pip 包管理工具",
graphs={ usage="piptool <command> [options]",
# 安装包
"i": px.Graph.from_specs([pip_install]),
# 升级 pip
"up": px.Graph.from_specs([pip_upgrade]),
# 卸载包 (需要参数)
"u": px.Graph.from_specs(
[
px.TaskSpec("pip_uninstall", fn=lambda: pip_uninstall([])),
]
),
# 下载包
"d": px.Graph.from_specs(
[
px.TaskSpec("pip_download", fn=lambda: pip_download([])),
]
),
# 冻结依赖
"f": px.Graph.from_specs(
[
px.TaskSpec("pip_freeze", fn=pip_freeze),
]
),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 安装命令
install_parser = subparsers.add_parser("i", help="安装包")
install_parser.add_argument("packages", nargs="+", help="要安装的包名")
# 卸载命令
uninstall_parser = subparsers.add_parser("u", help="卸载包")
uninstall_parser.add_argument("packages", nargs="+", help="要卸载的包名 (支持通配符)")
# 重装命令
reinstall_parser = subparsers.add_parser("r", help="重新安装包")
reinstall_parser.add_argument("packages", nargs="+", help="要重装的包名")
reinstall_parser.add_argument("--offline", action="store_true", help="使用离线模式")
# 下载命令
download_parser = subparsers.add_parser("d", help="下载包")
download_parser.add_argument("packages", nargs="+", help="要下载的包名")
download_parser.add_argument("--offline", action="store_true", help="使用离线模式")
# 升级 pip 命令
subparsers.add_parser("up", help="升级 pip")
# 冻结依赖命令
subparsers.add_parser("f", help="冻结依赖到 requirements.txt")
args = parser.parse_args()
if args.command == "i":
graph = px.Graph.from_specs([
px.TaskSpec("pip_install", cmd=["pip", "install", *args.packages], verbose=True)
])
elif args.command == "u":
graph = px.Graph.from_specs([
px.TaskSpec("pip_uninstall", fn=pip_uninstall, args=(args.packages,), verbose=True)
])
elif args.command == "r":
graph = px.Graph.from_specs([
px.TaskSpec("pip_reinstall", fn=pip_reinstall, args=(args.packages,), kwargs={"offline": args.offline}, verbose=True)
])
elif args.command == "d":
graph = px.Graph.from_specs([
px.TaskSpec("pip_download", fn=pip_download, args=(args.packages,), kwargs={"offline": args.offline}, verbose=True)
])
elif args.command == "up":
graph = px.Graph.from_specs([
px.TaskSpec("pip_upgrade", cmd=["python", "-m", "pip", "install", "--upgrade", "pip"], verbose=True)
])
elif args.command == "f":
graph = px.Graph.from_specs([
px.TaskSpec("pip_freeze", fn=pip_freeze, verbose=True)
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+27 -17
View File
@@ -124,14 +124,6 @@ $bitmap.Dispose()
print(f"截图已保存: {output_path}") print(f"截图已保存: {output_path}")
# ============================================================================
# TaskSpec 定义
# ============================================================================
screenshot_full: px.TaskSpec = px.TaskSpec("screenshot_full", fn=take_screenshot_full)
screenshot_area: px.TaskSpec = px.TaskSpec("screenshot_area", fn=take_screenshot_area)
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -139,14 +131,32 @@ screenshot_area: px.TaskSpec = px.TaskSpec("screenshot_area", fn=take_screenshot
def main() -> None: def main() -> None:
"""截图工具主函数.""" """截图工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="Screenshot - 截图工具", description="Screenshot - 截图工具",
graphs={ usage="screenshot <command> [options]",
# 全屏截图
"f": px.Graph.from_specs([screenshot_full]),
# 区域截图
"a": px.Graph.from_specs([screenshot_area]),
},
) )
runner.run_cli() subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 全屏截图命令
full_parser = subparsers.add_parser("full", help="全屏截图")
full_parser.add_argument("--filename", type=str, help="文件名")
# 区域截图命令
area_parser = subparsers.add_parser("area", help="区域截图")
area_parser.add_argument("--filename", type=str, help="文件名")
args = parser.parse_args()
if args.command == "full":
graph = px.Graph.from_specs([
px.TaskSpec("screenshot_full", fn=take_screenshot_full, kwargs={"filename": args.filename})
])
elif args.command == "area":
graph = px.Graph.from_specs([
px.TaskSpec("screenshot_area", fn=take_screenshot_area, kwargs={"filename": args.filename})
])
else:
parser.print_help()
return
px.run(graph, strategy="thread")
+19 -18
View File
@@ -89,17 +89,6 @@ grep -qF '{pub_key.split()[1]}' authorized_keys 2>/dev/null || echo '{pub_key}'
sys.exit(1) sys.exit(1)
# ============================================================================
# TaskSpec 定义
# ============================================================================
# SSH 密钥部署需要参数,这里提供默认示例
ssh_deploy_default: px.TaskSpec = px.TaskSpec(
"ssh_deploy_default",
fn=lambda: ssh_copy_id("localhost", "user", "password"),
)
# ============================================================================ # ============================================================================
# CLI Runner # CLI Runner
# ============================================================================ # ============================================================================
@@ -107,12 +96,24 @@ ssh_deploy_default: px.TaskSpec = px.TaskSpec(
def main() -> None: def main() -> None:
"""SSH 密钥部署工具主函数.""" """SSH 密钥部署工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="SSHCopyID - SSH 密钥部署工具", description="SSHCopyID - SSH 密钥部署工具",
graphs={ usage="sshcopyid <hostname> <username> <password> [--port PORT] [--keypath KEYPATH]",
# 部署 SSH 密钥 (需要参数)
"d": px.Graph.from_specs([ssh_deploy_default]),
},
) )
runner.run_cli() parser.add_argument("hostname", type=str, help="远程服务器主机名或 IP 地址")
parser.add_argument("username", type=str, help="远程服务器用户名")
parser.add_argument("password", type=str, help="远程服务器密码")
parser.add_argument("--port", type=int, default=22, help="SSH 端口 (默认: 22)")
parser.add_argument("--keypath", type=str, default="~/.ssh/id_rsa.pub", help="公钥文件路径")
parser.add_argument("--timeout", type=int, default=30, help="SSH 操作超时秒数 (默认: 30)")
args = parser.parse_args()
graph = px.Graph.from_specs([
px.TaskSpec(
"ssh_deploy",
fn=ssh_copy_id,
args=(args.hostname, args.username, args.password),
kwargs={"port": args.port, "keypath": args.keypath, "timeout": args.timeout},
)
])
px.run(graph, strategy="thread")
+15 -113
View File
@@ -5,16 +5,11 @@
from __future__ import annotations from __future__ import annotations
import argparse
import shutil import shutil
import subprocess
from pathlib import Path from pathlib import Path
import pyflowx as px import pyflowx as px
from pyflowx.conditions import Constants
# ============================================================================
# 辅助函数
# ============================================================================
def which_command(command: str) -> Path | None: def which_command(command: str) -> Path | None:
@@ -31,119 +26,26 @@ def which_command(command: str) -> Path | None:
命令路径, 如果未找到则返回 None 命令路径, 如果未找到则返回 None
""" """
cmd_path = shutil.which(command) cmd_path = shutil.which(command)
return Path(cmd_path) if cmd_path else None
def which_all_commands(commands: list[str]) -> dict[str, Path | None]:
"""查找多个命令路径.
Parameters
----------
commands : list[str]
命令名称列表
Returns
-------
dict[str, Path | None]
命令路径字典
"""
results: dict[str, Path | None] = {}
for cmd in commands:
results[cmd] = which_command(cmd)
return results
def where_command_windows(command: str) -> list[Path]:
"""Windows 下使用 where 命令查找所有匹配路径.
Parameters
----------
command : str
命令名称
Returns
-------
list[Path]
匹配的路径列表
"""
if not Constants.IS_WINDOWS:
return []
try:
result = subprocess.run(
["where", command],
capture_output=True,
text=True,
check=True,
)
paths = [Path(line.strip()) for line in result.stdout.strip().split("\n") if line.strip()]
return paths
except subprocess.CalledProcessError:
return []
def print_command_info(command: str) -> None:
"""打印命令信息.
Parameters
----------
command : str
命令名称
"""
cmd_path = which_command(command)
if cmd_path: if cmd_path:
print(f"{command}: {cmd_path}") print(f"匹配路径: - {cmd_path}")
if Constants.IS_WINDOWS: return Path(cmd_path)
all_paths = where_command_windows(command)
if len(all_paths) > 1:
print("所有匹配路径:")
for path in all_paths:
print(f" {path}")
else: else:
print(f"{command}: 未找到") print(f"{command}: 未找到")
return None
# ============================================================================
# TaskSpec 定义
# ============================================================================
which_python: px.TaskSpec = px.TaskSpec("which_python", fn=lambda: print_command_info("python"))
which_pip: px.TaskSpec = px.TaskSpec("which_pip", fn=lambda: print_command_info("pip"))
which_node: px.TaskSpec = px.TaskSpec("which_node", fn=lambda: print_command_info("node"))
which_npm: px.TaskSpec = px.TaskSpec("which_npm", fn=lambda: print_command_info("npm"))
which_git: px.TaskSpec = px.TaskSpec("which_git", fn=lambda: print_command_info("git"))
which_uv: px.TaskSpec = px.TaskSpec("which_uv", fn=lambda: print_command_info("uv"))
which_rustc: px.TaskSpec = px.TaskSpec("which_rustc", fn=lambda: print_command_info("rustc"))
which_cargo: px.TaskSpec = px.TaskSpec("which_cargo", fn=lambda: print_command_info("cargo"))
# ============================================================================
# CLI Runner
# ============================================================================
def main() -> None: def main() -> None:
"""命令查找工具主函数.""" """命令查找工具主函数."""
runner = px.CliRunner( parser = argparse.ArgumentParser(
strategy="thread",
description="Which - 命令查找工具", description="Which - 命令查找工具",
graphs={ usage="which <command> [command ...]",
# 查找 python
"py": px.Graph.from_specs([which_python]),
# 查找 pip
"pip": px.Graph.from_specs([which_pip]),
# 查找 node
"node": px.Graph.from_specs([which_node]),
# 查找 npm
"npm": px.Graph.from_specs([which_npm]),
# 查找 git
"git": px.Graph.from_specs([which_git]),
# 查找 uv
"uv": px.Graph.from_specs([which_uv]),
# 查找 rustc
"rustc": px.Graph.from_specs([which_rustc]),
# 查找 cargo
"cargo": px.Graph.from_specs([which_cargo]),
},
) )
runner.run_cli() parser.add_argument(
"commands",
type=str,
nargs="+",
help="要查找的命令名称 (如: python pip node npm git uv rustc cargo)",
)
args = parser.parse_args()
graph = px.Graph.from_specs([px.TaskSpec(f"which_{cmd}", fn=which_command, args=(cmd,)) for cmd in args.commands])
px.run(graph, strategy="thread")
Generated
+1 -1
View File
@@ -2184,7 +2184,7 @@ wheels = [
[[package]] [[package]]
name = "pyflowx" name = "pyflowx"
version = "0.1.6" version = "0.1.7"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" }, { name = "graphlib-backport", marker = "python_full_version < '3.9'" },