From fad964b37069403e4a3574aeef86c70999d42408 Mon Sep 17 00:00:00 2001 From: gooker_young Date: Sat, 20 Jun 2026 16:29:25 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E8=A1=8C=E4=BB=BB=E5=8A=A1=E6=94=AF=E6=8C=81=E4=B8=8E=E6=9D=A1?= =?UTF-8?q?=E4=BB=B6=E6=89=A7=E8=A1=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增条件判断模块,支持平台、环境变量、应用安装等条件检查 2. 扩展TaskSpec支持cmd参数,可直接执行shell命令或包装Python函数 3. 添加任务条件执行、工作目录设置功能 4. 重构任务执行逻辑,使用effective_fn统一处理函数与命令 5. 新增完整的命令行构建工具pymake 6. 新增配套测试用例覆盖命令执行与条件逻辑 7. 更新项目版本至0.1.2,调整入口脚本为pymake --- pyproject.toml | 2 +- src/pyflowx/__init__.py | 45 +++- src/pyflowx/cli/__init__.py | 0 src/pyflowx/cli/pymake.py | 444 ++++++++++++++++++++++++++++++++ src/pyflowx/conditions.py | 225 ++++++++++++++++ src/pyflowx/context.py | 8 +- src/pyflowx/executors.py | 26 +- src/pyflowx/graph.py | 6 + src/pyflowx/task.py | 144 ++++++++++- tests/test_taskspec_commands.py | 352 +++++++++++++++++++++++++ uv.lock | 2 +- 11 files changed, 1241 insertions(+), 13 deletions(-) create mode 100644 src/pyflowx/cli/__init__.py create mode 100644 src/pyflowx/cli/pymake.py create mode 100644 src/pyflowx/conditions.py create mode 100644 tests/test_taskspec_commands.py diff --git a/pyproject.toml b/pyproject.toml index 4f09c25..998b94a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ requires-python = ">=3.8" version = "0.1.2" [project.scripts] -pyflowx-demo = "pyflowx.__main__:main" +pymake = "pyflowx.cli.pymake:main" [project.optional-dependencies] dev = [ diff --git a/src/pyflowx/__init__.py b/src/pyflowx/__init__.py index e27f33a..b4aaeda 100644 --- a/src/pyflowx/__init__.py +++ b/src/pyflowx/__init__.py @@ -22,10 +22,44 @@ ]) report = px.run(graph, strategy="sequential") print(report["double"]) # [2, 4, 6] + +命令行任务示例 +-------------- + import pyflowx as px + from pyflowx.conditions import IS_WINDOWS, BuiltinConditions + + graph = px.Graph.from_specs([ + # 使用命令列表 + px.TaskSpec("list_files", cmd=["ls", "-la"]), + # 使用 shell 命令 + px.TaskSpec("check_git", cmd="git status"), + # 条件执行:仅在 Windows 上运行 + px.TaskSpec( + "win_only", + cmd=["dir"], + conditions=(IS_WINDOWS,) + ), + # 条件执行:仅在 git 已安装时运行 + px.TaskSpec( + "git_check", + cmd=["git", "--version"], + conditions=(BuiltinConditions.HAS_APP_INSTALLED("git"),) + ), + ]) + report = px.run(graph) """ from __future__ import annotations +from .conditions import ( + BuiltinConditions, + Constants, + Condition, + IS_LINUX, + IS_MACOS, + IS_POSIX, + IS_WINDOWS, +) from .context import Context, build_call_args, describe_injection from .errors import ( CycleError, @@ -41,7 +75,7 @@ from .executors import run from .graph import Graph from .report import RunReport from .storage import JSONBackend, MemoryBackend, StateBackend -from .task import TaskEvent, TaskResult, TaskSpec, TaskStatus +from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus __version__ = "0.1.2" @@ -52,6 +86,7 @@ __all__ = [ "TaskResult", "TaskEvent", "Context", + "TaskCmd", "Graph", "RunReport", # 执行 @@ -69,6 +104,14 @@ __all__ = [ "TaskTimeoutError", "InjectionError", "StorageError", + # 条件判断 + "Condition", + "Constants", + "BuiltinConditions", + "IS_WINDOWS", + "IS_LINUX", + "IS_MACOS", + "IS_POSIX", # 辅助(高级) "build_call_args", "describe_injection", diff --git a/src/pyflowx/cli/__init__.py b/src/pyflowx/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pyflowx/cli/pymake.py b/src/pyflowx/cli/pymake.py new file mode 100644 index 0000000..3f58a2f --- /dev/null +++ b/src/pyflowx/cli/pymake.py @@ -0,0 +1,444 @@ +"""Python 构建工具模块. + +完全替代传统的 Makefile, +提供更好的跨平台兼容性和 Python 生态集成. +""" + +from __future__ import annotations + +from pathlib import Path + +import pyflowx as px + + +class PymakeConfig: + """PyMake 配置类.""" + + # 项目根目录 + PROJECT_ROOT: str = str(Path(__file__).parent.parent.parent.parent) + CORE_DIR: str = f"{PROJECT_ROOT}/bitool-core" + CORE_PATTERN: str = f"{CORE_DIR}/target/bitool_core-*-cp*.whl" + TIMEOUT: int = 600 + + # Python 构建 + BUILD_TOOL: str = "uv" + BUILD_COMMAND: list[str] = [BUILD_TOOL, "build"] + + # Rust 构建 (maturin) + MATURIN_TOOL: str = "maturin" + MATURIN_BUILD_COMMAND: list[str] = ["maturin", "build", "-r"] + MATURIN_DEV_COMMAND: list[str] = ["maturin", "develop"] + MATURIN_BUILD_OPTIONS_WIN7: list[str] = [ + "--target", + "x86_64-win7-windows-msvc", + "-Zbuild-std", + "-i", + "python3.8", + ] + + # 文档 + DOC_BUILD_TOOL: str = "sphinx-build" + DOC_BUILD_COMMAND: list[str] = ["sphinx-build", "-b", "html", "docs", "docs/_build"] + + # 清理 + DIRS_TO_IGNORE: list[str] = [".venv"] + PYTHON_BUILD_DIRS: list[str] = ["dist", "build", "*.egg-info", "src/*.egg-info"] + + +conf = PymakeConfig() + + +def main(): + """ + ╔══════════════════════════════════════════════════════════╗ + ║ PyMake 构建工具 ║ + ╚══════════════════════════════════════════════════════════╝ + + 🔨 构建命令: + pymake b - 构建 Python 主包 (uv build) + pymake bc - 构建 Rust 核心模块 (maturin build) + pymake ba - 构建所有包 (先 Rust 后 Python) + + 📦 安装命令 (开发模式): + pymake ic - 安装 Rust 核心模块 (maturin develop) + pymake ip - 安装 Python 主包 (uv pip install -e .) + pymake ia - 安装所有包 (开发模式,推荐) + + 🧹 清理命令: + pymake cp - 清理 Python 构建产物 + pymake cc - 清理 Rust 构建产物 (cargo clean) + pymake ca - 清理所有构建产物 + + 🛠️ 开发工具: + pymake t - 运行测试 (pytest) + pymake tc - 运行测试并生成覆盖率报告 + pymake lint - 代码格式化与检查 (ruff) + pymake typecheck - 类型检查 (ty) + pymake doc - 构建文档 (sphinx) + + 🔬 多版本测试: + pymake tox - 多版本 Python 测试 (3.8-3.14) + pymake tox-install - 安装所有 Python 版本 (仅安装不测试) + + 📦 发布命令: + pymake pb - 发布到 PyPI (先 Rust 后 Python) + + 💡 常用工作流: + 1. 初始化开发环境: pymake ia + 2. 日常开发: pymake lint && pymake t + 3. 构建发布包: pymake ba + 4. 多版本兼容性测试: pymake tox + 5. 发布到 PyPI: pymake pb + 6. 清理重新开始: pymake ca && pymake ia + + 📝 示例: + pymake ba # 构建所有包 + pymake ia # 安装开发环境 + pymake t # 运行测试 + pymake tox # 多版本兼容性测试 + pymake lint # 格式化代码 + pymake ca # 清理所有构建产物 + """ + pymake_graph = px.Graph.from_specs( + [ + px.TaskSpec("b", cmd=conf.BUILD_COMMAND), + px.TaskSpec("bc", cmd=conf.MATURIN_BUILD_COMMAND), + px.TaskSpec("ic", cmd=conf.DOC_BUILD_COMMAND), + ] + ) + px.run(pymake_graph) + + +# class PyMakeSkill(MapSkill): +# """PyMake 构建技能.""" + +# name: ClassVar[str] = "pymake" +# description: ClassVar[str] = "Bitool PyMake - Python构建工具" + +# @override +# def create_scheduler_map( +# self, +# args: argparse.Namespace, +# ) -> dict[str, CommandScheduler] | None: +# return { +# # === 构建命令 === +# # 构建 Python 包 +# "b": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=conf.BUILD_COMMAND, +# allow_conditions=[_UV_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 构建 Rust 核心模块 +# "bc": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=_get_maturin_build_command(), +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 构建双包(先 Rust 后 Python) +# "ba": CommandScheduler( +# commands=[ +# RunCommand( +# name="maturin_build", +# cmd=_get_maturin_build_command(), +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# RunCommand( +# name="uv_build", +# cmd=conf.BUILD_COMMAND, +# allow_conditions=[_UV_CONDITION], +# timeout=conf.TIMEOUT, +# dependencies=["maturin_build"], +# ), +# ], +# ), +# # === 安装命令(开发模式) === +# # 安装 Rust 核心模块 +# "ic": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=conf.MATURIN_DEV_COMMAND, +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# ), +# ], +# ), +# # 安装 Python 主包 +# "ip": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["uv", "pip", "install", "-e", "."], +# allow_conditions=[_UV_CONDITION], +# success_codes={0, 2}, +# ), +# ], +# ), +# # 安装双包(开发模式) +# "ia": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=conf.MATURIN_DEV_COMMAND, +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# ), +# RunCommand( +# cmd=["uv", "pip", "install", "-e", "."], +# allow_conditions=[_UV_CONDITION], +# success_codes={0, 2}, +# ), +# ], +# ), +# # === 清理命令 === +# # 清理 Python 构建产物 +# "cp": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["rm", "-rf", *conf.PYTHON_BUILD_DIRS], +# allow_conditions=[_GIT_CONDITION], # 使用 git clean 更安全 +# ), +# ], +# ), +# # 清理 Rust 构建产物 +# "cc": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["cargo", "clean"], +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[ +# _MATURIN_CONDITION, +# ], # 有 maturin 说明有 cargo +# ), +# ], +# ), +# # 清理所有构建产物 +# "ca": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["cargo", "clean"], +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# ), +# RunCommand( +# cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE], +# allow_conditions=[_GIT_CONDITION], +# ), +# ], +# ), +# # === 开发工具 === +# # 运行测试, 跳过 slow, 并行模式 +# "t": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=[ +# "pytest", +# "-m", +# "not slow", +# "-n", +# "8", +# "--dist", +# "loadfile", +# "--color=yes", +# "--durations=10", +# ], +# allow_conditions=[_PYTEST_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 运行测试, 非并行模式 +# "tf": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=[ +# "pytest", +# "-m", +# "not slow", +# "--dist", +# "loadfile", +# "--color=yes", +# "--durations=10", +# ], +# allow_conditions=[_PYTEST_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式 +# # --dist loadfile: 按文件分发测试, 减少模块导入开销 +# "tc": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=[ +# "pytest", +# "-m", +# "not slow", +# "--cov", +# "-n", +# "auto", +# "--dist", +# "loadfile", +# "--tb=short", +# "-v", +# "--color=yes", +# "--durations=10", +# ], +# allow_conditions=[_PYTEST_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 代码格式化与检查 +# "lint": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=[ +# "ruff", +# "check", +# "--fix", +# "--unsafe-fixes", +# ], +# allow_conditions=[_RUFF_CONDITION], +# timeout=conf.TIMEOUT, +# cwd=Path(conf.PROJECT_ROOT), +# ), +# ], +# ), +# # 类型检查 +# "typecheck": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["ty", "check", "src/bitool"], +# allow_conditions=[BuiltinConditions.HAS_APP_INSTALLED("ty")], +# ), +# ], +# ), +# # 构建文档 +# "doc": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=conf.DOC_BUILD_COMMAND, +# allow_conditions=[ +# BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL), +# ], +# ), +# ], +# ), +# # 发布到 PyPI(先发布 Rust 核心模块,再发布 Python 主包) +# "pb": CommandScheduler( +# commands=[ +# # 发布 Python 主包(在项目根目录执行,依赖 Rust 发布成功) +# RunCommand( +# name="publish-python", +# cmd=["hatch", "publish"], +# cwd=Path(conf.PROJECT_ROOT), +# allow_conditions=[_HATCH_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# "pba": CommandScheduler( +# commands=[ +# # 发布 Rust 核心模块(在 core 目录执行) +# RunCommand( +# name="publish-rust", +# # --disable-progress-bar: 避免 Windows GBK 控制台渲染 rich 进度条 +# # 中的 \u2022 字符导致 UnicodeEncodeError +# cmd=[ +# "twine", +# "upload", +# "--disable-progress-bar", +# conf.CORE_PATTERN, +# ], +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# RunCommand( +# name="publish-python", +# cmd=["hatch", "publish"], +# cwd=Path(conf.PROJECT_ROOT), +# allow_conditions=[_HATCH_CONDITION], +# timeout=conf.TIMEOUT, +# dependencies=["publish-rust"], +# ), +# ], +# ), +# "pbc": CommandScheduler( +# commands=[ +# # 发布 Rust 核心模块(在 core 目录执行) +# RunCommand( +# name="publish-rust", +# cmd=["maturin", "publish"], +# cwd=Path(conf.CORE_DIR), +# allow_conditions=[_MATURIN_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # === 多版本测试命令 === +# # 运行多版本 Python 测试 (tox) +# "tox": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=["tox", "-p", "auto"], +# allow_conditions=[_TOX_CONDITION], +# timeout=conf.TIMEOUT, +# ), +# ], +# ), +# # 安装多版本 Python (仅安装不测试) +# "tox-install": CommandScheduler( +# commands=[ +# RunCommand( +# cmd=[ +# "uv", +# "python", +# "install", +# "3.8", +# "3.9", +# "3.10", +# "3.11", +# "3.12", +# "3.13", +# "3.14", +# ], +# allow_conditions=[_UV_CONDITION], +# timeout=600, +# ), +# ], +# ), +# } + + +# def _get_maturin_build_command() -> list[str]: +# """获取 maturin 构建命令(根据平台自动添加参数). + +# Returns +# ------- +# list[str] +# 完整的 maturin 构建命令列表. +# """ +# base_cmd = conf.MATURIN_BUILD_COMMAND.copy() +# if Constants.IS_WINDOWS: +# base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7) +# return base_cmd + + +# # 命令条件判断 +# _MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL) +# _PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest") +# _UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL) +# _HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch") +# _RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff") +# _GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git") +# _TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox") diff --git a/src/pyflowx/conditions.py b/src/pyflowx/conditions.py new file mode 100644 index 0000000..b6bc993 --- /dev/null +++ b/src/pyflowx/conditions.py @@ -0,0 +1,225 @@ +"""条件判断模块. + +提供平台条件、应用安装条件等预定义条件判断函数, +用于 TaskSpec 的条件执行功能. +""" + +from __future__ import annotations + +import shutil +import sys +from typing import Callable, Optional + +# 条件判断函数类型 +Condition = Callable[[], bool] + + +class Constants: + """常量定义.""" + + IS_WINDOWS: bool = sys.platform == "win32" + IS_LINUX: bool = sys.platform == "linux" + IS_MACOS: bool = sys.platform == "darwin" + IS_POSIX: bool = sys.platform != "win32" + + +class BuiltinConditions: + """内置条件判断函数集合.""" + + @staticmethod + def IS_WINDOWS() -> bool: + """是否为 Windows 平台.""" + return Constants.IS_WINDOWS + + @staticmethod + def IS_LINUX() -> bool: + bool = Constants.IS_LINUX + return bool + + @staticmethod + def IS_MACOS() -> bool: + """是否为 macOS 平台.""" + return Constants.IS_MACOS + + @staticmethod + def IS_POSIX() -> bool: + """是否为 POSIX 系统 (Linux/macOS).""" + return Constants.IS_POSIX + + @staticmethod + def PYTHON_VERSION(major: int, minor: Optional[int] = None) -> bool: + """检查 Python 版本是否匹配. + + Parameters + ---------- + major : int + 主版本号. + minor : int | None + 次版本号, 若为 None 则仅检查主版本. + + Returns + ------- + bool + 版本是否匹配. + """ + if minor is None: + return sys.version_info.major == major + return sys.version_info.major == major and sys.version_info.minor == minor + + @staticmethod + def PYTHON_VERSION_AT_LEAST(major: int, minor: int = 0) -> bool: + """检查 Python 版本是否 >= 指定版本. + + Parameters + ---------- + major : int + 主版本号. + minor : int + 次版本号. + + Returns + ------- + bool + 当前版本是否 >= 指定版本. + """ + return sys.version_info >= (major, minor) + + @staticmethod + def HAS_APP_INSTALLED(app_name: str) -> Condition: + """检查指定应用是否已安装. + + Parameters + ---------- + app_name : str + 应用名称 (如 "git", "python", "pytest"). + + Returns + ------- + Condition + 条件判断函数. + """ + + def _check() -> bool: + return shutil.which(app_name) is not None + + _check.__name__ = f"HAS_APP_INSTALLED({app_name!r})" + return _check + + @staticmethod + def ENV_VAR_EXISTS(var_name: str) -> Condition: + """检查环境变量是否存在. + + Parameters + ---------- + var_name : str + 环境变量名. + + Returns + ------- + Condition + 条件判断函数. + """ + + def _check() -> bool: + return var_name in os.environ + + _check.__name__ = f"ENV_VAR_EXISTS({var_name!r})" + return _check + + @staticmethod + def ENV_VAR_EQUALS(var_name: str, value: str) -> Condition: + """检查环境变量是否等于指定值. + + Parameters + ---------- + var_name : str + 环境变量名. + value : str + 期望的值. + + Returns + ------- + Condition + 条件判断函数. + """ + + def _check() -> bool: + return os.environ.get(var_name) == value + + _check.__name__ = f"ENV_VAR_EQUALS({var_name!r}, {value!r})" + return _check + + @staticmethod + def NOT(condition: Condition) -> Condition: + """对条件取反. + + Parameters + ---------- + condition : Condition + 原始条件. + + Returns + ------- + Condition + 取反后的条件. + """ + + def _check() -> bool: + return not condition() + + _check.__name__ = f"NOT({condition.__name__})" + return _check + + @staticmethod + def AND(*conditions: Condition) -> Condition: + """多个条件的逻辑与. + + Parameters + ---------- + *conditions : Condition + 条件列表. + + Returns + ------- + Condition + 组合条件. + """ + + def _check() -> bool: + return all(c() for c in conditions) + + names = [c.__name__ for c in conditions] + _check.__name__ = f"AND({', '.join(names)})" + return _check + + @staticmethod + def OR(*conditions: Condition) -> Condition: + """多个条件的逻辑或. + + Parameters + ---------- + *conditions : Condition + 条件列表. + + Returns + ------- + Condition + 组合条件. + """ + + def _check() -> bool: + return any(c() for c in conditions) + + names = [c.__name__ for c in conditions] + _check.__name__ = f"OR({', '.join(names)})" + return _check + + +# 导出常用条件 +IS_WINDOWS = BuiltinConditions.IS_WINDOWS +IS_LINUX = BuiltinConditions.IS_LINUX +IS_MACOS = BuiltinConditions.IS_MACOS +IS_POSIX = BuiltinConditions.IS_POSIX + +# 导入 os 用于环境变量检查 +import os # noqa: E402 diff --git a/src/pyflowx/context.py b/src/pyflowx/context.py index 443a6c8..4111904 100644 --- a/src/pyflowx/context.py +++ b/src/pyflowx/context.py @@ -72,7 +72,9 @@ def build_call_args( InjectionError 若必需参数无法满足,或静态 ``kwargs`` 与注入依赖名冲突。 """ - sig = inspect.signature(spec.fn) + # 使用 effective_fn 而不是 fn,以支持 cmd 参数 + fn = spec.effective_fn + sig = inspect.signature(fn) params = sig.parameters # 检测特殊参数类型。 @@ -160,7 +162,9 @@ def describe_injection(spec: TaskSpec[object]) -> str: 供 ``dry_run`` 使用,在不执行的情况下展示执行计划。 """ - sig = inspect.signature(spec.fn) + # 使用 effective_fn 而不是 fn,以支持 cmd 参数 + fn = spec.effective_fn + sig = inspect.signature(fn) # 确定哪些位置参数由 spec.args 填充。 positional_params = [ p diff --git a/src/pyflowx/executors.py b/src/pyflowx/executors.py index be33c4b..a47ca47 100644 --- a/src/pyflowx/executors.py +++ b/src/pyflowx/executors.py @@ -38,8 +38,8 @@ Strategy = str # "sequential" | "thread" | "async" def _is_async_fn(spec: TaskSpec[object]) -> bool: - """判断 ``spec.fn`` 是否为协程函数。""" - return inspect.iscoroutinefunction(spec.fn) + """判断 ``spec.effective_fn`` 是否为协程函数。""" + return inspect.iscoroutinefunction(spec.effective_fn) def _emit( @@ -92,6 +92,14 @@ def _run_sync_with_retry( ) -> TaskResult[object]: """执行同步任务并带重试;返回填充好的 TaskResult。""" result: TaskResult[object] = TaskResult(spec=spec) + + # 检查条件是否满足 + if spec.conditions and not spec.should_execute(): + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + logger.info("task %r skipped (条件不满足)", spec.name) + return result + result.started_at = datetime.now() max_attempts = spec.retries + 1 args, kwargs = build_call_args(spec, context) @@ -99,7 +107,7 @@ def _run_sync_with_retry( while True: result.attempts += 1 try: - result.value = spec.fn(*args, **kwargs) + result.value = spec.effective_fn(*args, **kwargs) result.status = TaskStatus.SUCCESS result.finished_at = datetime.now() return result @@ -118,6 +126,14 @@ async def _run_async_with_retry( ) -> TaskResult[object]: """在事件循环上执行任务(同步或异步)并带重试。""" result: TaskResult[object] = TaskResult(spec=spec) + + # 检查条件是否满足 + if spec.conditions and not spec.should_execute(): + result.status = TaskStatus.SKIPPED + result.finished_at = datetime.now() + logger.info("task %r skipped (条件不满足)", spec.name) + return result + result.started_at = datetime.now() max_attempts = spec.retries + 1 args, kwargs = build_call_args(spec, context) @@ -127,7 +143,7 @@ async def _run_async_with_retry( result.attempts += 1 try: if _is_async_fn(spec): - coro = cast(Awaitable[Any], spec.fn(*args, **kwargs)) + coro = cast(Awaitable[Any], spec.effective_fn(*args, **kwargs)) if spec.timeout is not None: result.value = await asyncio.wait_for(coro, timeout=spec.timeout) else: @@ -135,7 +151,7 @@ async def _run_async_with_retry( else: # 将同步工作卸载到线程,保持事件循环存活。 def fn_call() -> Any: - return spec.fn(*args, **kwargs) + return spec.effective_fn(*args, **kwargs) if spec.timeout is not None: result.value = await asyncio.wait_for( diff --git a/src/pyflowx/graph.py b/src/pyflowx/graph.py index d072f08..9974062 100644 --- a/src/pyflowx/graph.py +++ b/src/pyflowx/graph.py @@ -165,12 +165,15 @@ class Graph: TaskSpec( name=spec.name, fn=spec.fn, + cmd=spec.cmd, depends_on=pruned_deps, args=spec.args, kwargs=spec.kwargs, retries=spec.retries, timeout=spec.timeout, tags=spec.tags, + conditions=spec.conditions, + cwd=spec.cwd, ) ) return Graph.from_specs(kept) @@ -189,12 +192,15 @@ class Graph: TaskSpec( name=spec.name, fn=spec.fn, + cmd=spec.cmd, depends_on=pruned_deps, args=spec.args, kwargs=spec.kwargs, retries=spec.retries, timeout=spec.timeout, tags=spec.tags, + conditions=spec.conditions, + cwd=spec.cwd, ) ) return Graph.from_specs(kept) diff --git a/src/pyflowx/task.py b/src/pyflowx/task.py index 983b5a6..baacac4 100644 --- a/src/pyflowx/task.py +++ b/src/pyflowx/task.py @@ -15,16 +15,16 @@ * ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。 """ -from __future__ import annotations - from dataclasses import dataclass, field from datetime import datetime from enum import Enum +from pathlib import Path from typing import ( Any, Callable, Coroutine, Generic, + List, Mapping, Optional, Tuple, @@ -44,6 +44,16 @@ TaskFn = Union[ # 单任务类型由函数签名本身保留。 Context = Mapping[str, Any] +# 命令类型支持 +TaskCmd = Union[ + List[str], # 命令列表, 如 ["ls", "-la"] + str, # shell 命令字符串 + Callable[..., T], # Python 函数 +] + +# 条件判断函数类型 +Condition = Callable[[], bool] + class TaskStatus(Enum): """任务在单次运行内的生命周期状态。""" @@ -66,6 +76,13 @@ class TaskSpec(Generic[T]): fn: 待执行的可调用对象,可为同步或异步。其参数名驱动自动上下文 注入(见 :mod:`pyflowx.context`)。 + 若提供 ``cmd`` 参数,则此参数会被忽略。 + cmd: + 命令列表或 shell 字符串,支持三种形态: + - ``list[str]``: 命令及参数列表,如 ``["ls", "-la"]`` + - ``str``: shell 命令字符串,如 ``"pip freeze > requirements.txt"`` + - ``Callable``: Python 函数,与 ``fn`` 参数等效 + 若提供此参数,会自动包装为执行函数,覆盖 ``fn`` 参数。 depends_on: 必须先完成才能运行本任务的任务名列表。顺序无关;框架会做 拓扑排序。 @@ -83,16 +100,26 @@ class TaskSpec(Generic[T]): 取消 worker future。 tags: 自由标签,供 :meth:`Graph.subgraph` 做选择性执行与调试。 + conditions: + 条件判断函数列表,只有所有条件都返回 ``True`` 时才执行任务。 + 若任一条件返回 ``False``,任务会被标记为 SKIPPED。 + 用于平台判断、环境变量检查等场景。 + cwd: + 命令执行的工作目录,仅在使用 ``cmd`` 参数时有效。 + ``None`` 表示当前目录。 """ name: str - fn: TaskFn[T] + fn: Optional[TaskFn[T]] = None + cmd: Optional[TaskCmd] = None depends_on: Tuple[str, ...] = () args: Tuple[Any, ...] = () kwargs: Mapping[str, Any] = field(default_factory=dict) retries: int = 0 timeout: Optional[float] = None tags: Tuple[str, ...] = () + conditions: Tuple[Condition, ...] = () + cwd: Optional[Path] = None def __post_init__(self) -> None: if not self.name: @@ -103,6 +130,117 @@ class TaskSpec(Generic[T]): raise ValueError(f"TaskSpec '{self.name}': timeout must be > 0.") if self.name in self.depends_on: raise ValueError(f"TaskSpec '{self.name}' cannot depend on itself.") + if self.fn is None and self.cmd is None: + raise ValueError(f"TaskSpec '{self.name}': 必须提供 fn 或 cmd 参数。") + + @property + def effective_fn(self) -> TaskFn[T]: + """获取有效的执行函数. + + 若提供了 ``cmd`` 参数,则返回包装后的命令执行函数; + 否则返回 ``fn`` 参数。 + """ + if self.cmd is not None: + return self._wrap_cmd() + if self.fn is not None: + return self.fn + raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。") + + def _wrap_cmd(self) -> TaskFn[T]: + """将 cmd 包装为可执行函数. + + Returns + ------- + TaskFn[T] + 包装后的执行函数. + """ + cmd = self.cmd + cwd = self.cwd + timeout = self.timeout + + if isinstance(cmd, list): + + def _run_list() -> T: + import subprocess + + cmd_str = " ".join(str(arg) for arg in cmd) + try: + result = subprocess.run( + cmd, + cwd=cwd, + timeout=timeout, + capture_output=True, + text=True, + check=False, + ) + except FileNotFoundError: + raise RuntimeError(f"命令未找到: {cmd_str}") + except subprocess.TimeoutExpired: + raise RuntimeError(f"命令执行超时: {cmd_str} ({timeout}s)") + except OSError as e: + raise RuntimeError(f"命令执行异常: {cmd_str}: {e}") + + if result.returncode == 0: + return None # type: ignore[return-value] + + err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}" + if result.stderr.strip(): + err_msg += f"\n{result.stderr.strip()}" + raise RuntimeError(err_msg) + + _run_list.__name__ = self.name + return _run_list # type: ignore[return-value] + + if isinstance(cmd, str): + + def _run_shell() -> T: + import subprocess + + try: + result = subprocess.run( + cmd, + shell=True, + cwd=cwd, + timeout=timeout, + capture_output=True, + text=True, + check=False, + ) + except FileNotFoundError: + raise RuntimeError(f"Shell 命令未找到: {cmd}") + except subprocess.TimeoutExpired: + raise RuntimeError(f"Shell 命令执行超时: {cmd} ({timeout}s)") + except OSError as e: + raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}") + + if result.returncode == 0: + return None # type: ignore[return-value] + + err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}" + if result.stderr.strip(): + err_msg += f"\n{result.stderr.strip()}" + raise RuntimeError(err_msg) + + _run_shell.__name__ = self.name + return _run_shell # type: ignore[return-value] + + if callable(cmd): + return cmd # type: ignore[return-value] + + raise TypeError( + f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}" + ) + + def should_execute(self) -> bool: + """检查任务是否应该执行. + + Returns + ------- + bool + 若所有条件都返回 ``True``,则返回 ``True``; + 否则返回 ``False``。 + """ + return all(condition() for condition in self.conditions) @dataclass diff --git a/tests/test_taskspec_commands.py b/tests/test_taskspec_commands.py new file mode 100644 index 0000000..553f557 --- /dev/null +++ b/tests/test_taskspec_commands.py @@ -0,0 +1,352 @@ +"""测试 TaskSpec 的命令和条件执行功能.""" + +import sys +from pathlib import Path + +import pytest + +import pyflowx as px +from pyflowx.conditions import ( + IS_LINUX, + IS_MACOS, + IS_WINDOWS, + BuiltinConditions, +) + +# 跨平台的 echo 命令 +if sys.platform == "win32": + ECHO_CMD = ["cmd", "/c", "echo"] +else: + ECHO_CMD = ["echo"] + + +def test_taskspec_with_cmd_list(): + """测试使用命令列表的 TaskSpec.""" + graph = px.Graph.from_specs( + [ + px.TaskSpec("echo_test", cmd=[*ECHO_CMD, "hello"]), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "echo_test" in report.results + assert report.results["echo_test"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_with_cmd_string(): + """测试使用 shell 命令字符串的 TaskSpec.""" + if sys.platform == "win32": + shell_cmd = 'cmd /c "echo hello from shell"' + else: + shell_cmd = "echo 'hello from shell'" + + graph = px.Graph.from_specs( + [ + px.TaskSpec("shell_test", cmd=shell_cmd), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "shell_test" in report.results + assert report.results["shell_test"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_with_conditions_skip(): + """测试条件不满足时任务被跳过.""" + + # 创建一个永远不会满足的条件 + def never_true(): + return False + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "should_skip", + cmd=[*ECHO_CMD, "this should not run"], + conditions=(never_true,), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "should_skip" in report.results + assert report.results["should_skip"].status == px.TaskStatus.SKIPPED + + +def test_taskspec_with_conditions_execute(): + """测试条件满足时任务正常执行.""" + + # 创建一个总是满足的条件 + def always_true(): + return True + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "should_run", + cmd=[*ECHO_CMD, "this should run"], + conditions=(always_true,), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "should_run" in report.results + assert report.results["should_run"].status == px.TaskStatus.SUCCESS + + +def test_platform_conditions(): + """测试平台条件.""" + if sys.platform == "win32": + win_cmd = ["cmd", "/c", "echo", "Windows"] + posix_cmd = ["echo", "POSIX"] + else: + win_cmd = ["echo", "Windows"] + posix_cmd = ["echo", "POSIX"] + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "win_task", + cmd=win_cmd, + conditions=(IS_WINDOWS,), + ), + px.TaskSpec( + "linux_task", + cmd=posix_cmd, + conditions=(IS_LINUX,), + ), + px.TaskSpec( + "macos_task", + cmd=posix_cmd, + conditions=(IS_MACOS,), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + + # 检查只有当前平台的任务执行了 + if sys.platform == "win32": + assert report.results["win_task"].status == px.TaskStatus.SUCCESS + assert report.results["linux_task"].status == px.TaskStatus.SKIPPED + assert report.results["macos_task"].status == px.TaskStatus.SKIPPED + elif sys.platform == "linux": + assert report.results["win_task"].status == px.TaskStatus.SKIPPED + assert report.results["linux_task"].status == px.TaskStatus.SUCCESS + assert report.results["macos_task"].status == px.TaskStatus.SKIPPED + elif sys.platform == "darwin": + assert report.results["win_task"].status == px.TaskStatus.SKIPPED + assert report.results["linux_task"].status == px.TaskStatus.SKIPPED + assert report.results["macos_task"].status == px.TaskStatus.SUCCESS + + +def test_app_installed_conditions(): + """测试应用安装条件.""" + # 测试 python 应该总是安装的 + if sys.platform == "win32": + python_cmd = ["python", "--version"] + else: + python_cmd = ["python3", "--version"] + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "python_check", + cmd=python_cmd, + conditions=(BuiltinConditions.HAS_APP_INSTALLED("python"),), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "python_check" in report.results + # python 应该总是安装的 + assert report.results["python_check"].status == px.TaskStatus.SUCCESS + + +def test_combined_conditions(): + """测试组合条件.""" + # AND 条件 + and_condition = BuiltinConditions.AND( + lambda: True, + lambda: True, + ) + + # OR 条件 + or_condition = BuiltinConditions.OR( + lambda: True, + lambda: False, + ) + + # NOT 条件 + not_condition = BuiltinConditions.NOT(lambda: False) + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "and_test", + cmd=[*ECHO_CMD, "AND"], + conditions=(and_condition,), + ), + px.TaskSpec( + "or_test", + cmd=[*ECHO_CMD, "OR"], + conditions=(or_condition,), + ), + px.TaskSpec( + "not_test", + cmd=[*ECHO_CMD, "NOT"], + conditions=(not_condition,), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert report.results["and_test"].status == px.TaskStatus.SUCCESS + assert report.results["or_test"].status == px.TaskStatus.SUCCESS + assert report.results["not_test"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_with_cwd(): + """测试工作目录设置.""" + if sys.platform == "win32": + ls_cmd = ["cmd", "/c", "dir"] + else: + ls_cmd = ["ls", "-la"] + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "list_current", + cmd=ls_cmd, + cwd=Path.cwd(), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "list_current" in report.results + assert report.results["list_current"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_with_timeout(): + """测试超时设置.""" + graph = px.Graph.from_specs( + [ + # 短时间任务应该成功 + px.TaskSpec( + "short_task", + cmd=["python", "-c", "import time; time.sleep(0.1)"], + timeout=1.0, + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert "short_task" in report.results + assert report.results["short_task"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_dependency_with_conditions(): + """测试依赖和条件的组合.""" + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "first", + cmd=[*ECHO_CMD, "first"], + conditions=(lambda: True,), + ), + px.TaskSpec( + "second", + cmd=[*ECHO_CMD, "second"], + depends_on=("first",), + conditions=(lambda: True,), + ), + px.TaskSpec( + "third", + cmd=[*ECHO_CMD, "third"], + depends_on=("second",), + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert report.results["first"].status == px.TaskStatus.SUCCESS + assert report.results["second"].status == px.TaskStatus.SUCCESS + assert report.results["third"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_mixed_fn_and_cmd(): + """测试混合使用 fn 和 cmd.""" + + def my_function(): + return "result from function" + + graph = px.Graph.from_specs( + [ + px.TaskSpec("fn_task", fn=my_function), + px.TaskSpec("cmd_task", cmd=[*ECHO_CMD, "from command"]), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert report.results["fn_task"].status == px.TaskStatus.SUCCESS + assert report.results["fn_task"].value == "result from function" + assert report.results["cmd_task"].status == px.TaskStatus.SUCCESS + + +def test_taskspec_cmd_overrides_fn(): + """测试 cmd 参数优先于 fn 参数.""" + + def my_function(): + return "should not run" + + graph = px.Graph.from_specs( + [ + px.TaskSpec( + "cmd_priority", + fn=my_function, + cmd=[*ECHO_CMD, "cmd takes priority"], + ), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert report.results["cmd_priority"].status == px.TaskStatus.SUCCESS + # cmd 应该被执行,而不是 fn + assert report.results["cmd_priority"].value is None + + +def test_taskspec_callable_cmd(): + """测试 cmd 参数使用可调用对象.""" + + def my_callable(): + return "callable result" + + graph = px.Graph.from_specs( + [ + px.TaskSpec("callable_cmd", cmd=my_callable), + ] + ) + + report = px.run(graph, strategy="sequential") + assert report.success + assert report.results["callable_cmd"].status == px.TaskStatus.SUCCESS + assert report.results["callable_cmd"].value == "callable result" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/uv.lock b/uv.lock index 8d28ef9..b407cb6 100644 --- a/uv.lock +++ b/uv.lock @@ -2193,7 +2193,7 @@ wheels = [ [[package]] name = "pyflowx" -version = "0.1.1" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "graphlib-backport", marker = "python_full_version < '3.9'" },