feat: 添加命令行任务支持与条件执行功能

1. 新增条件判断模块,支持平台、环境变量、应用安装等条件检查
2. 扩展TaskSpec支持cmd参数,可直接执行shell命令或包装Python函数
3. 添加任务条件执行、工作目录设置功能
4. 重构任务执行逻辑,使用effective_fn统一处理函数与命令
5. 新增完整的命令行构建工具pymake
6. 新增配套测试用例覆盖命令执行与条件逻辑
7. 更新项目版本至0.1.2,调整入口脚本为pymake
This commit is contained in:
2026-06-20 16:29:25 +08:00
parent 3bbdf142ba
commit fad964b370
11 changed files with 1241 additions and 13 deletions
+1 -1
View File
@@ -20,7 +20,7 @@ requires-python = ">=3.8"
version = "0.1.2" version = "0.1.2"
[project.scripts] [project.scripts]
pyflowx-demo = "pyflowx.__main__:main" pymake = "pyflowx.cli.pymake:main"
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
+44 -1
View File
@@ -22,10 +22,44 @@
]) ])
report = px.run(graph, strategy="sequential") report = px.run(graph, strategy="sequential")
print(report["double"]) # [2, 4, 6] print(report["double"]) # [2, 4, 6]
命令行任务示例
--------------
import pyflowx as px
from pyflowx.conditions import IS_WINDOWS, BuiltinConditions
graph = px.Graph.from_specs([
# 使用命令列表
px.TaskSpec("list_files", cmd=["ls", "-la"]),
# 使用 shell 命令
px.TaskSpec("check_git", cmd="git status"),
# 条件执行:仅在 Windows 上运行
px.TaskSpec(
"win_only",
cmd=["dir"],
conditions=(IS_WINDOWS,)
),
# 条件执行:仅在 git 已安装时运行
px.TaskSpec(
"git_check",
cmd=["git", "--version"],
conditions=(BuiltinConditions.HAS_APP_INSTALLED("git"),)
),
])
report = px.run(graph)
""" """
from __future__ import annotations from __future__ import annotations
from .conditions import (
BuiltinConditions,
Constants,
Condition,
IS_LINUX,
IS_MACOS,
IS_POSIX,
IS_WINDOWS,
)
from .context import Context, build_call_args, describe_injection from .context import Context, build_call_args, describe_injection
from .errors import ( from .errors import (
CycleError, CycleError,
@@ -41,7 +75,7 @@ from .executors import run
from .graph import Graph from .graph import Graph
from .report import RunReport from .report import RunReport
from .storage import JSONBackend, MemoryBackend, StateBackend from .storage import JSONBackend, MemoryBackend, StateBackend
from .task import TaskEvent, TaskResult, TaskSpec, TaskStatus from .task import TaskCmd, TaskEvent, TaskResult, TaskSpec, TaskStatus
__version__ = "0.1.2" __version__ = "0.1.2"
@@ -52,6 +86,7 @@ __all__ = [
"TaskResult", "TaskResult",
"TaskEvent", "TaskEvent",
"Context", "Context",
"TaskCmd",
"Graph", "Graph",
"RunReport", "RunReport",
# 执行 # 执行
@@ -69,6 +104,14 @@ __all__ = [
"TaskTimeoutError", "TaskTimeoutError",
"InjectionError", "InjectionError",
"StorageError", "StorageError",
# 条件判断
"Condition",
"Constants",
"BuiltinConditions",
"IS_WINDOWS",
"IS_LINUX",
"IS_MACOS",
"IS_POSIX",
# 辅助(高级) # 辅助(高级)
"build_call_args", "build_call_args",
"describe_injection", "describe_injection",
View File
+444
View File
@@ -0,0 +1,444 @@
"""Python 构建工具模块.
完全替代传统的 Makefile,
提供更好的跨平台兼容性和 Python 生态集成.
"""
from __future__ import annotations
from pathlib import Path
import pyflowx as px
class PymakeConfig:
"""PyMake 配置类."""
# 项目根目录
PROJECT_ROOT: str = str(Path(__file__).parent.parent.parent.parent)
CORE_DIR: str = f"{PROJECT_ROOT}/bitool-core"
CORE_PATTERN: str = f"{CORE_DIR}/target/bitool_core-*-cp*.whl"
TIMEOUT: int = 600
# Python 构建
BUILD_TOOL: str = "uv"
BUILD_COMMAND: list[str] = [BUILD_TOOL, "build"]
# Rust 构建 (maturin)
MATURIN_TOOL: str = "maturin"
MATURIN_BUILD_COMMAND: list[str] = ["maturin", "build", "-r"]
MATURIN_DEV_COMMAND: list[str] = ["maturin", "develop"]
MATURIN_BUILD_OPTIONS_WIN7: list[str] = [
"--target",
"x86_64-win7-windows-msvc",
"-Zbuild-std",
"-i",
"python3.8",
]
# 文档
DOC_BUILD_TOOL: str = "sphinx-build"
DOC_BUILD_COMMAND: list[str] = ["sphinx-build", "-b", "html", "docs", "docs/_build"]
# 清理
DIRS_TO_IGNORE: list[str] = [".venv"]
PYTHON_BUILD_DIRS: list[str] = ["dist", "build", "*.egg-info", "src/*.egg-info"]
conf = PymakeConfig()
def main():
"""
╔══════════════════════════════════════════════════════════╗
║ PyMake 构建工具 ║
╚══════════════════════════════════════════════════════════╝
🔨 构建命令:
pymake b - 构建 Python 主包 (uv build)
pymake bc - 构建 Rust 核心模块 (maturin build)
pymake ba - 构建所有包 (先 Rust 后 Python)
📦 安装命令 (开发模式):
pymake ic - 安装 Rust 核心模块 (maturin develop)
pymake ip - 安装 Python 主包 (uv pip install -e .)
pymake ia - 安装所有包 (开发模式,推荐)
🧹 清理命令:
pymake cp - 清理 Python 构建产物
pymake cc - 清理 Rust 构建产物 (cargo clean)
pymake ca - 清理所有构建产物
🛠️ 开发工具:
pymake t - 运行测试 (pytest)
pymake tc - 运行测试并生成覆盖率报告
pymake lint - 代码格式化与检查 (ruff)
pymake typecheck - 类型检查 (ty)
pymake doc - 构建文档 (sphinx)
🔬 多版本测试:
pymake tox - 多版本 Python 测试 (3.8-3.14)
pymake tox-install - 安装所有 Python 版本 (仅安装不测试)
📦 发布命令:
pymake pb - 发布到 PyPI (先 Rust 后 Python)
💡 常用工作流:
1. 初始化开发环境: pymake ia
2. 日常开发: pymake lint && pymake t
3. 构建发布包: pymake ba
4. 多版本兼容性测试: pymake tox
5. 发布到 PyPI: pymake pb
6. 清理重新开始: pymake ca && pymake ia
📝 示例:
pymake ba # 构建所有包
pymake ia # 安装开发环境
pymake t # 运行测试
pymake tox # 多版本兼容性测试
pymake lint # 格式化代码
pymake ca # 清理所有构建产物
"""
pymake_graph = px.Graph.from_specs(
[
px.TaskSpec("b", cmd=conf.BUILD_COMMAND),
px.TaskSpec("bc", cmd=conf.MATURIN_BUILD_COMMAND),
px.TaskSpec("ic", cmd=conf.DOC_BUILD_COMMAND),
]
)
px.run(pymake_graph)
# class PyMakeSkill(MapSkill):
# """PyMake 构建技能."""
# name: ClassVar[str] = "pymake"
# description: ClassVar[str] = "Bitool PyMake - Python构建工具"
# @override
# def create_scheduler_map(
# self,
# args: argparse.Namespace,
# ) -> dict[str, CommandScheduler] | None:
# return {
# # === 构建命令 ===
# # 构建 Python 包
# "b": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.BUILD_COMMAND,
# allow_conditions=[_UV_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 构建 Rust 核心模块
# "bc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=_get_maturin_build_command(),
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 构建双包(先 Rust 后 Python
# "ba": CommandScheduler(
# commands=[
# RunCommand(
# name="maturin_build",
# cmd=_get_maturin_build_command(),
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# RunCommand(
# name="uv_build",
# cmd=conf.BUILD_COMMAND,
# allow_conditions=[_UV_CONDITION],
# timeout=conf.TIMEOUT,
# dependencies=["maturin_build"],
# ),
# ],
# ),
# # === 安装命令(开发模式) ===
# # 安装 Rust 核心模块
# "ic": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.MATURIN_DEV_COMMAND,
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# ],
# ),
# # 安装 Python 主包
# "ip": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["uv", "pip", "install", "-e", "."],
# allow_conditions=[_UV_CONDITION],
# success_codes={0, 2},
# ),
# ],
# ),
# # 安装双包(开发模式)
# "ia": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.MATURIN_DEV_COMMAND,
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# RunCommand(
# cmd=["uv", "pip", "install", "-e", "."],
# allow_conditions=[_UV_CONDITION],
# success_codes={0, 2},
# ),
# ],
# ),
# # === 清理命令 ===
# # 清理 Python 构建产物
# "cp": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["rm", "-rf", *conf.PYTHON_BUILD_DIRS],
# allow_conditions=[_GIT_CONDITION], # 使用 git clean 更安全
# ),
# ],
# ),
# # 清理 Rust 构建产物
# "cc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["cargo", "clean"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[
# _MATURIN_CONDITION,
# ], # 有 maturin 说明有 cargo
# ),
# ],
# ),
# # 清理所有构建产物
# "ca": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["cargo", "clean"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# ),
# RunCommand(
# cmd=["git", "clean", "-xfd", "-e", *conf.DIRS_TO_IGNORE],
# allow_conditions=[_GIT_CONDITION],
# ),
# ],
# ),
# # === 开发工具 ===
# # 运行测试, 跳过 slow, 并行模式
# "t": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "-n",
# "8",
# "--dist",
# "loadfile",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 运行测试, 非并行模式
# "tf": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "--dist",
# "loadfile",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 运行测试并生成覆盖率报告, 跳过 slow, 并行模式
# # --dist loadfile: 按文件分发测试, 减少模块导入开销
# "tc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "pytest",
# "-m",
# "not slow",
# "--cov",
# "-n",
# "auto",
# "--dist",
# "loadfile",
# "--tb=short",
# "-v",
# "--color=yes",
# "--durations=10",
# ],
# allow_conditions=[_PYTEST_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 代码格式化与检查
# "lint": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "ruff",
# "check",
# "--fix",
# "--unsafe-fixes",
# ],
# allow_conditions=[_RUFF_CONDITION],
# timeout=conf.TIMEOUT,
# cwd=Path(conf.PROJECT_ROOT),
# ),
# ],
# ),
# # 类型检查
# "typecheck": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["ty", "check", "src/bitool"],
# allow_conditions=[BuiltinConditions.HAS_APP_INSTALLED("ty")],
# ),
# ],
# ),
# # 构建文档
# "doc": CommandScheduler(
# commands=[
# RunCommand(
# cmd=conf.DOC_BUILD_COMMAND,
# allow_conditions=[
# BuiltinConditions.HAS_APP_INSTALLED(conf.DOC_BUILD_TOOL),
# ],
# ),
# ],
# ),
# # 发布到 PyPI(先发布 Rust 核心模块,再发布 Python 主包)
# "pb": CommandScheduler(
# commands=[
# # 发布 Python 主包(在项目根目录执行,依赖 Rust 发布成功)
# RunCommand(
# name="publish-python",
# cmd=["hatch", "publish"],
# cwd=Path(conf.PROJECT_ROOT),
# allow_conditions=[_HATCH_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# "pba": CommandScheduler(
# commands=[
# # 发布 Rust 核心模块(在 core 目录执行)
# RunCommand(
# name="publish-rust",
# # --disable-progress-bar: 避免 Windows GBK 控制台渲染 rich 进度条
# # 中的 \u2022 字符导致 UnicodeEncodeError
# cmd=[
# "twine",
# "upload",
# "--disable-progress-bar",
# conf.CORE_PATTERN,
# ],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# RunCommand(
# name="publish-python",
# cmd=["hatch", "publish"],
# cwd=Path(conf.PROJECT_ROOT),
# allow_conditions=[_HATCH_CONDITION],
# timeout=conf.TIMEOUT,
# dependencies=["publish-rust"],
# ),
# ],
# ),
# "pbc": CommandScheduler(
# commands=[
# # 发布 Rust 核心模块(在 core 目录执行)
# RunCommand(
# name="publish-rust",
# cmd=["maturin", "publish"],
# cwd=Path(conf.CORE_DIR),
# allow_conditions=[_MATURIN_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # === 多版本测试命令 ===
# # 运行多版本 Python 测试 (tox)
# "tox": CommandScheduler(
# commands=[
# RunCommand(
# cmd=["tox", "-p", "auto"],
# allow_conditions=[_TOX_CONDITION],
# timeout=conf.TIMEOUT,
# ),
# ],
# ),
# # 安装多版本 Python (仅安装不测试)
# "tox-install": CommandScheduler(
# commands=[
# RunCommand(
# cmd=[
# "uv",
# "python",
# "install",
# "3.8",
# "3.9",
# "3.10",
# "3.11",
# "3.12",
# "3.13",
# "3.14",
# ],
# allow_conditions=[_UV_CONDITION],
# timeout=600,
# ),
# ],
# ),
# }
# def _get_maturin_build_command() -> list[str]:
# """获取 maturin 构建命令(根据平台自动添加参数).
# Returns
# -------
# list[str]
# 完整的 maturin 构建命令列表.
# """
# base_cmd = conf.MATURIN_BUILD_COMMAND.copy()
# if Constants.IS_WINDOWS:
# base_cmd.extend(conf.MATURIN_BUILD_OPTIONS_WIN7)
# return base_cmd
# # 命令条件判断
# _MATURIN_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.MATURIN_TOOL)
# _PYTEST_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("pytest")
# _UV_CONDITION = BuiltinConditions.HAS_APP_INSTALLED(conf.BUILD_TOOL)
# _HATCH_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("hatch")
# _RUFF_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("ruff")
# _GIT_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("git")
# _TOX_CONDITION = BuiltinConditions.HAS_APP_INSTALLED("tox")
+225
View File
@@ -0,0 +1,225 @@
"""条件判断模块.
提供平台条件、应用安装条件等预定义条件判断函数,
用于 TaskSpec 的条件执行功能.
"""
from __future__ import annotations
import shutil
import sys
from typing import Callable, Optional
# 条件判断函数类型
Condition = Callable[[], bool]
class Constants:
"""常量定义."""
IS_WINDOWS: bool = sys.platform == "win32"
IS_LINUX: bool = sys.platform == "linux"
IS_MACOS: bool = sys.platform == "darwin"
IS_POSIX: bool = sys.platform != "win32"
class BuiltinConditions:
"""内置条件判断函数集合."""
@staticmethod
def IS_WINDOWS() -> bool:
"""是否为 Windows 平台."""
return Constants.IS_WINDOWS
@staticmethod
def IS_LINUX() -> bool:
bool = Constants.IS_LINUX
return bool
@staticmethod
def IS_MACOS() -> bool:
"""是否为 macOS 平台."""
return Constants.IS_MACOS
@staticmethod
def IS_POSIX() -> bool:
"""是否为 POSIX 系统 (Linux/macOS)."""
return Constants.IS_POSIX
@staticmethod
def PYTHON_VERSION(major: int, minor: Optional[int] = None) -> bool:
"""检查 Python 版本是否匹配.
Parameters
----------
major : int
主版本号.
minor : int | None
次版本号, 若为 None 则仅检查主版本.
Returns
-------
bool
版本是否匹配.
"""
if minor is None:
return sys.version_info.major == major
return sys.version_info.major == major and sys.version_info.minor == minor
@staticmethod
def PYTHON_VERSION_AT_LEAST(major: int, minor: int = 0) -> bool:
"""检查 Python 版本是否 >= 指定版本.
Parameters
----------
major : int
主版本号.
minor : int
次版本号.
Returns
-------
bool
当前版本是否 >= 指定版本.
"""
return sys.version_info >= (major, minor)
@staticmethod
def HAS_APP_INSTALLED(app_name: str) -> Condition:
"""检查指定应用是否已安装.
Parameters
----------
app_name : str
应用名称 (如 "git", "python", "pytest").
Returns
-------
Condition
条件判断函数.
"""
def _check() -> bool:
return shutil.which(app_name) is not None
_check.__name__ = f"HAS_APP_INSTALLED({app_name!r})"
return _check
@staticmethod
def ENV_VAR_EXISTS(var_name: str) -> Condition:
"""检查环境变量是否存在.
Parameters
----------
var_name : str
环境变量名.
Returns
-------
Condition
条件判断函数.
"""
def _check() -> bool:
return var_name in os.environ
_check.__name__ = f"ENV_VAR_EXISTS({var_name!r})"
return _check
@staticmethod
def ENV_VAR_EQUALS(var_name: str, value: str) -> Condition:
"""检查环境变量是否等于指定值.
Parameters
----------
var_name : str
环境变量名.
value : str
期望的值.
Returns
-------
Condition
条件判断函数.
"""
def _check() -> bool:
return os.environ.get(var_name) == value
_check.__name__ = f"ENV_VAR_EQUALS({var_name!r}, {value!r})"
return _check
@staticmethod
def NOT(condition: Condition) -> Condition:
"""对条件取反.
Parameters
----------
condition : Condition
原始条件.
Returns
-------
Condition
取反后的条件.
"""
def _check() -> bool:
return not condition()
_check.__name__ = f"NOT({condition.__name__})"
return _check
@staticmethod
def AND(*conditions: Condition) -> Condition:
"""多个条件的逻辑与.
Parameters
----------
*conditions : Condition
条件列表.
Returns
-------
Condition
组合条件.
"""
def _check() -> bool:
return all(c() for c in conditions)
names = [c.__name__ for c in conditions]
_check.__name__ = f"AND({', '.join(names)})"
return _check
@staticmethod
def OR(*conditions: Condition) -> Condition:
"""多个条件的逻辑或.
Parameters
----------
*conditions : Condition
条件列表.
Returns
-------
Condition
组合条件.
"""
def _check() -> bool:
return any(c() for c in conditions)
names = [c.__name__ for c in conditions]
_check.__name__ = f"OR({', '.join(names)})"
return _check
# 导出常用条件
IS_WINDOWS = BuiltinConditions.IS_WINDOWS
IS_LINUX = BuiltinConditions.IS_LINUX
IS_MACOS = BuiltinConditions.IS_MACOS
IS_POSIX = BuiltinConditions.IS_POSIX
# 导入 os 用于环境变量检查
import os # noqa: E402
+6 -2
View File
@@ -72,7 +72,9 @@ def build_call_args(
InjectionError InjectionError
若必需参数无法满足,或静态 ``kwargs`` 与注入依赖名冲突。 若必需参数无法满足,或静态 ``kwargs`` 与注入依赖名冲突。
""" """
sig = inspect.signature(spec.fn) # 使用 effective_fn 而不是 fn,以支持 cmd 参数
fn = spec.effective_fn
sig = inspect.signature(fn)
params = sig.parameters params = sig.parameters
# 检测特殊参数类型。 # 检测特殊参数类型。
@@ -160,7 +162,9 @@ def describe_injection(spec: TaskSpec[object]) -> str:
供 ``dry_run`` 使用,在不执行的情况下展示执行计划。 供 ``dry_run`` 使用,在不执行的情况下展示执行计划。
""" """
sig = inspect.signature(spec.fn) # 使用 effective_fn 而不是 fn,以支持 cmd 参数
fn = spec.effective_fn
sig = inspect.signature(fn)
# 确定哪些位置参数由 spec.args 填充。 # 确定哪些位置参数由 spec.args 填充。
positional_params = [ positional_params = [
p p
+21 -5
View File
@@ -38,8 +38,8 @@ Strategy = str # "sequential" | "thread" | "async"
def _is_async_fn(spec: TaskSpec[object]) -> bool: def _is_async_fn(spec: TaskSpec[object]) -> bool:
"""判断 ``spec.fn`` 是否为协程函数。""" """判断 ``spec.effective_fn`` 是否为协程函数。"""
return inspect.iscoroutinefunction(spec.fn) return inspect.iscoroutinefunction(spec.effective_fn)
def _emit( def _emit(
@@ -92,6 +92,14 @@ def _run_sync_with_retry(
) -> TaskResult[object]: ) -> TaskResult[object]:
"""执行同步任务并带重试;返回填充好的 TaskResult。""" """执行同步任务并带重试;返回填充好的 TaskResult。"""
result: TaskResult[object] = TaskResult(spec=spec) result: TaskResult[object] = TaskResult(spec=spec)
# 检查条件是否满足
if spec.conditions and not spec.should_execute():
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
logger.info("task %r skipped (条件不满足)", spec.name)
return result
result.started_at = datetime.now() result.started_at = datetime.now()
max_attempts = spec.retries + 1 max_attempts = spec.retries + 1
args, kwargs = build_call_args(spec, context) args, kwargs = build_call_args(spec, context)
@@ -99,7 +107,7 @@ def _run_sync_with_retry(
while True: while True:
result.attempts += 1 result.attempts += 1
try: try:
result.value = spec.fn(*args, **kwargs) result.value = spec.effective_fn(*args, **kwargs)
result.status = TaskStatus.SUCCESS result.status = TaskStatus.SUCCESS
result.finished_at = datetime.now() result.finished_at = datetime.now()
return result return result
@@ -118,6 +126,14 @@ async def _run_async_with_retry(
) -> TaskResult[object]: ) -> TaskResult[object]:
"""在事件循环上执行任务(同步或异步)并带重试。""" """在事件循环上执行任务(同步或异步)并带重试。"""
result: TaskResult[object] = TaskResult(spec=spec) result: TaskResult[object] = TaskResult(spec=spec)
# 检查条件是否满足
if spec.conditions and not spec.should_execute():
result.status = TaskStatus.SKIPPED
result.finished_at = datetime.now()
logger.info("task %r skipped (条件不满足)", spec.name)
return result
result.started_at = datetime.now() result.started_at = datetime.now()
max_attempts = spec.retries + 1 max_attempts = spec.retries + 1
args, kwargs = build_call_args(spec, context) args, kwargs = build_call_args(spec, context)
@@ -127,7 +143,7 @@ async def _run_async_with_retry(
result.attempts += 1 result.attempts += 1
try: try:
if _is_async_fn(spec): if _is_async_fn(spec):
coro = cast(Awaitable[Any], spec.fn(*args, **kwargs)) coro = cast(Awaitable[Any], spec.effective_fn(*args, **kwargs))
if spec.timeout is not None: if spec.timeout is not None:
result.value = await asyncio.wait_for(coro, timeout=spec.timeout) result.value = await asyncio.wait_for(coro, timeout=spec.timeout)
else: else:
@@ -135,7 +151,7 @@ async def _run_async_with_retry(
else: else:
# 将同步工作卸载到线程,保持事件循环存活。 # 将同步工作卸载到线程,保持事件循环存活。
def fn_call() -> Any: def fn_call() -> Any:
return spec.fn(*args, **kwargs) return spec.effective_fn(*args, **kwargs)
if spec.timeout is not None: if spec.timeout is not None:
result.value = await asyncio.wait_for( result.value = await asyncio.wait_for(
+6
View File
@@ -165,12 +165,15 @@ class Graph:
TaskSpec( TaskSpec(
name=spec.name, name=spec.name,
fn=spec.fn, fn=spec.fn,
cmd=spec.cmd,
depends_on=pruned_deps, depends_on=pruned_deps,
args=spec.args, args=spec.args,
kwargs=spec.kwargs, kwargs=spec.kwargs,
retries=spec.retries, retries=spec.retries,
timeout=spec.timeout, timeout=spec.timeout,
tags=spec.tags, tags=spec.tags,
conditions=spec.conditions,
cwd=spec.cwd,
) )
) )
return Graph.from_specs(kept) return Graph.from_specs(kept)
@@ -189,12 +192,15 @@ class Graph:
TaskSpec( TaskSpec(
name=spec.name, name=spec.name,
fn=spec.fn, fn=spec.fn,
cmd=spec.cmd,
depends_on=pruned_deps, depends_on=pruned_deps,
args=spec.args, args=spec.args,
kwargs=spec.kwargs, kwargs=spec.kwargs,
retries=spec.retries, retries=spec.retries,
timeout=spec.timeout, timeout=spec.timeout,
tags=spec.tags, tags=spec.tags,
conditions=spec.conditions,
cwd=spec.cwd,
) )
) )
return Graph.from_specs(kept) return Graph.from_specs(kept)
+141 -3
View File
@@ -15,16 +15,16 @@
* ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。 * ``TaskStatus`` 是封闭枚举;执行器绝不发明临时字符串。
""" """
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from pathlib import Path
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Coroutine, Coroutine,
Generic, Generic,
List,
Mapping, Mapping,
Optional, Optional,
Tuple, Tuple,
@@ -44,6 +44,16 @@ TaskFn = Union[
# 单任务类型由函数签名本身保留。 # 单任务类型由函数签名本身保留。
Context = Mapping[str, Any] Context = Mapping[str, Any]
# 命令类型支持
TaskCmd = Union[
List[str], # 命令列表, 如 ["ls", "-la"]
str, # shell 命令字符串
Callable[..., T], # Python 函数
]
# 条件判断函数类型
Condition = Callable[[], bool]
class TaskStatus(Enum): class TaskStatus(Enum):
"""任务在单次运行内的生命周期状态。""" """任务在单次运行内的生命周期状态。"""
@@ -66,6 +76,13 @@ class TaskSpec(Generic[T]):
fn: fn:
待执行的可调用对象,可为同步或异步。其参数名驱动自动上下文 待执行的可调用对象,可为同步或异步。其参数名驱动自动上下文
注入(见 :mod:`pyflowx.context`)。 注入(见 :mod:`pyflowx.context`)。
若提供 ``cmd`` 参数,则此参数会被忽略。
cmd:
命令列表或 shell 字符串,支持三种形态:
- ``list[str]``: 命令及参数列表,如 ``["ls", "-la"]``
- ``str``: shell 命令字符串,如 ``"pip freeze > requirements.txt"``
- ``Callable``: Python 函数,与 ``fn`` 参数等效
若提供此参数,会自动包装为执行函数,覆盖 ``fn`` 参数。
depends_on: depends_on:
必须先完成才能运行本任务的任务名列表。顺序无关;框架会做 必须先完成才能运行本任务的任务名列表。顺序无关;框架会做
拓扑排序。 拓扑排序。
@@ -83,16 +100,26 @@ class TaskSpec(Generic[T]):
取消 worker future。 取消 worker future。
tags: tags:
自由标签,供 :meth:`Graph.subgraph` 做选择性执行与调试。 自由标签,供 :meth:`Graph.subgraph` 做选择性执行与调试。
conditions:
条件判断函数列表,只有所有条件都返回 ``True`` 时才执行任务。
若任一条件返回 ``False``,任务会被标记为 SKIPPED。
用于平台判断、环境变量检查等场景。
cwd:
命令执行的工作目录,仅在使用 ``cmd`` 参数时有效。
``None`` 表示当前目录。
""" """
name: str name: str
fn: TaskFn[T] fn: Optional[TaskFn[T]] = None
cmd: Optional[TaskCmd] = None
depends_on: Tuple[str, ...] = () depends_on: Tuple[str, ...] = ()
args: Tuple[Any, ...] = () args: Tuple[Any, ...] = ()
kwargs: Mapping[str, Any] = field(default_factory=dict) kwargs: Mapping[str, Any] = field(default_factory=dict)
retries: int = 0 retries: int = 0
timeout: Optional[float] = None timeout: Optional[float] = None
tags: Tuple[str, ...] = () tags: Tuple[str, ...] = ()
conditions: Tuple[Condition, ...] = ()
cwd: Optional[Path] = None
def __post_init__(self) -> None: def __post_init__(self) -> None:
if not self.name: if not self.name:
@@ -103,6 +130,117 @@ class TaskSpec(Generic[T]):
raise ValueError(f"TaskSpec '{self.name}': timeout must be > 0.") raise ValueError(f"TaskSpec '{self.name}': timeout must be > 0.")
if self.name in self.depends_on: if self.name in self.depends_on:
raise ValueError(f"TaskSpec '{self.name}' cannot depend on itself.") raise ValueError(f"TaskSpec '{self.name}' cannot depend on itself.")
if self.fn is None and self.cmd is None:
raise ValueError(f"TaskSpec '{self.name}': 必须提供 fn 或 cmd 参数。")
@property
def effective_fn(self) -> TaskFn[T]:
"""获取有效的执行函数.
若提供了 ``cmd`` 参数,则返回包装后的命令执行函数;
否则返回 ``fn`` 参数。
"""
if self.cmd is not None:
return self._wrap_cmd()
if self.fn is not None:
return self.fn
raise ValueError(f"TaskSpec '{self.name}': 没有可执行的函数或命令。")
def _wrap_cmd(self) -> TaskFn[T]:
"""将 cmd 包装为可执行函数.
Returns
-------
TaskFn[T]
包装后的执行函数.
"""
cmd = self.cmd
cwd = self.cwd
timeout = self.timeout
if isinstance(cmd, list):
def _run_list() -> T:
import subprocess
cmd_str = " ".join(str(arg) for arg in cmd)
try:
result = subprocess.run(
cmd,
cwd=cwd,
timeout=timeout,
capture_output=True,
text=True,
check=False,
)
except FileNotFoundError:
raise RuntimeError(f"命令未找到: {cmd_str}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"命令执行超时: {cmd_str} ({timeout}s)")
except OSError as e:
raise RuntimeError(f"命令执行异常: {cmd_str}: {e}")
if result.returncode == 0:
return None # type: ignore[return-value]
err_msg = f"命令执行失败: `{cmd_str}`, 返回码: {result.returncode}"
if result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
_run_list.__name__ = self.name
return _run_list # type: ignore[return-value]
if isinstance(cmd, str):
def _run_shell() -> T:
import subprocess
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
timeout=timeout,
capture_output=True,
text=True,
check=False,
)
except FileNotFoundError:
raise RuntimeError(f"Shell 命令未找到: {cmd}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"Shell 命令执行超时: {cmd} ({timeout}s)")
except OSError as e:
raise RuntimeError(f"Shell 命令执行异常: {cmd}: {e}")
if result.returncode == 0:
return None # type: ignore[return-value]
err_msg = f"Shell 命令执行失败: `{cmd}`, 返回码: {result.returncode}"
if result.stderr.strip():
err_msg += f"\n{result.stderr.strip()}"
raise RuntimeError(err_msg)
_run_shell.__name__ = self.name
return _run_shell # type: ignore[return-value]
if callable(cmd):
return cmd # type: ignore[return-value]
raise TypeError(
f"TaskSpec '{self.name}': 不支持的 cmd 类型 {type(cmd).__name__}"
)
def should_execute(self) -> bool:
"""检查任务是否应该执行.
Returns
-------
bool
若所有条件都返回 ``True``,则返回 ``True``
否则返回 ``False``。
"""
return all(condition() for condition in self.conditions)
@dataclass @dataclass
+352
View File
@@ -0,0 +1,352 @@
"""测试 TaskSpec 的命令和条件执行功能."""
import sys
from pathlib import Path
import pytest
import pyflowx as px
from pyflowx.conditions import (
IS_LINUX,
IS_MACOS,
IS_WINDOWS,
BuiltinConditions,
)
# 跨平台的 echo 命令
if sys.platform == "win32":
ECHO_CMD = ["cmd", "/c", "echo"]
else:
ECHO_CMD = ["echo"]
def test_taskspec_with_cmd_list():
"""测试使用命令列表的 TaskSpec."""
graph = px.Graph.from_specs(
[
px.TaskSpec("echo_test", cmd=[*ECHO_CMD, "hello"]),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "echo_test" in report.results
assert report.results["echo_test"].status == px.TaskStatus.SUCCESS
def test_taskspec_with_cmd_string():
"""测试使用 shell 命令字符串的 TaskSpec."""
if sys.platform == "win32":
shell_cmd = 'cmd /c "echo hello from shell"'
else:
shell_cmd = "echo 'hello from shell'"
graph = px.Graph.from_specs(
[
px.TaskSpec("shell_test", cmd=shell_cmd),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "shell_test" in report.results
assert report.results["shell_test"].status == px.TaskStatus.SUCCESS
def test_taskspec_with_conditions_skip():
"""测试条件不满足时任务被跳过."""
# 创建一个永远不会满足的条件
def never_true():
return False
graph = px.Graph.from_specs(
[
px.TaskSpec(
"should_skip",
cmd=[*ECHO_CMD, "this should not run"],
conditions=(never_true,),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "should_skip" in report.results
assert report.results["should_skip"].status == px.TaskStatus.SKIPPED
def test_taskspec_with_conditions_execute():
"""测试条件满足时任务正常执行."""
# 创建一个总是满足的条件
def always_true():
return True
graph = px.Graph.from_specs(
[
px.TaskSpec(
"should_run",
cmd=[*ECHO_CMD, "this should run"],
conditions=(always_true,),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "should_run" in report.results
assert report.results["should_run"].status == px.TaskStatus.SUCCESS
def test_platform_conditions():
"""测试平台条件."""
if sys.platform == "win32":
win_cmd = ["cmd", "/c", "echo", "Windows"]
posix_cmd = ["echo", "POSIX"]
else:
win_cmd = ["echo", "Windows"]
posix_cmd = ["echo", "POSIX"]
graph = px.Graph.from_specs(
[
px.TaskSpec(
"win_task",
cmd=win_cmd,
conditions=(IS_WINDOWS,),
),
px.TaskSpec(
"linux_task",
cmd=posix_cmd,
conditions=(IS_LINUX,),
),
px.TaskSpec(
"macos_task",
cmd=posix_cmd,
conditions=(IS_MACOS,),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
# 检查只有当前平台的任务执行了
if sys.platform == "win32":
assert report.results["win_task"].status == px.TaskStatus.SUCCESS
assert report.results["linux_task"].status == px.TaskStatus.SKIPPED
assert report.results["macos_task"].status == px.TaskStatus.SKIPPED
elif sys.platform == "linux":
assert report.results["win_task"].status == px.TaskStatus.SKIPPED
assert report.results["linux_task"].status == px.TaskStatus.SUCCESS
assert report.results["macos_task"].status == px.TaskStatus.SKIPPED
elif sys.platform == "darwin":
assert report.results["win_task"].status == px.TaskStatus.SKIPPED
assert report.results["linux_task"].status == px.TaskStatus.SKIPPED
assert report.results["macos_task"].status == px.TaskStatus.SUCCESS
def test_app_installed_conditions():
"""测试应用安装条件."""
# 测试 python 应该总是安装的
if sys.platform == "win32":
python_cmd = ["python", "--version"]
else:
python_cmd = ["python3", "--version"]
graph = px.Graph.from_specs(
[
px.TaskSpec(
"python_check",
cmd=python_cmd,
conditions=(BuiltinConditions.HAS_APP_INSTALLED("python"),),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "python_check" in report.results
# python 应该总是安装的
assert report.results["python_check"].status == px.TaskStatus.SUCCESS
def test_combined_conditions():
"""测试组合条件."""
# AND 条件
and_condition = BuiltinConditions.AND(
lambda: True,
lambda: True,
)
# OR 条件
or_condition = BuiltinConditions.OR(
lambda: True,
lambda: False,
)
# NOT 条件
not_condition = BuiltinConditions.NOT(lambda: False)
graph = px.Graph.from_specs(
[
px.TaskSpec(
"and_test",
cmd=[*ECHO_CMD, "AND"],
conditions=(and_condition,),
),
px.TaskSpec(
"or_test",
cmd=[*ECHO_CMD, "OR"],
conditions=(or_condition,),
),
px.TaskSpec(
"not_test",
cmd=[*ECHO_CMD, "NOT"],
conditions=(not_condition,),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert report.results["and_test"].status == px.TaskStatus.SUCCESS
assert report.results["or_test"].status == px.TaskStatus.SUCCESS
assert report.results["not_test"].status == px.TaskStatus.SUCCESS
def test_taskspec_with_cwd():
"""测试工作目录设置."""
if sys.platform == "win32":
ls_cmd = ["cmd", "/c", "dir"]
else:
ls_cmd = ["ls", "-la"]
graph = px.Graph.from_specs(
[
px.TaskSpec(
"list_current",
cmd=ls_cmd,
cwd=Path.cwd(),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "list_current" in report.results
assert report.results["list_current"].status == px.TaskStatus.SUCCESS
def test_taskspec_with_timeout():
"""测试超时设置."""
graph = px.Graph.from_specs(
[
# 短时间任务应该成功
px.TaskSpec(
"short_task",
cmd=["python", "-c", "import time; time.sleep(0.1)"],
timeout=1.0,
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert "short_task" in report.results
assert report.results["short_task"].status == px.TaskStatus.SUCCESS
def test_taskspec_dependency_with_conditions():
"""测试依赖和条件的组合."""
graph = px.Graph.from_specs(
[
px.TaskSpec(
"first",
cmd=[*ECHO_CMD, "first"],
conditions=(lambda: True,),
),
px.TaskSpec(
"second",
cmd=[*ECHO_CMD, "second"],
depends_on=("first",),
conditions=(lambda: True,),
),
px.TaskSpec(
"third",
cmd=[*ECHO_CMD, "third"],
depends_on=("second",),
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert report.results["first"].status == px.TaskStatus.SUCCESS
assert report.results["second"].status == px.TaskStatus.SUCCESS
assert report.results["third"].status == px.TaskStatus.SUCCESS
def test_taskspec_mixed_fn_and_cmd():
"""测试混合使用 fn 和 cmd."""
def my_function():
return "result from function"
graph = px.Graph.from_specs(
[
px.TaskSpec("fn_task", fn=my_function),
px.TaskSpec("cmd_task", cmd=[*ECHO_CMD, "from command"]),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert report.results["fn_task"].status == px.TaskStatus.SUCCESS
assert report.results["fn_task"].value == "result from function"
assert report.results["cmd_task"].status == px.TaskStatus.SUCCESS
def test_taskspec_cmd_overrides_fn():
"""测试 cmd 参数优先于 fn 参数."""
def my_function():
return "should not run"
graph = px.Graph.from_specs(
[
px.TaskSpec(
"cmd_priority",
fn=my_function,
cmd=[*ECHO_CMD, "cmd takes priority"],
),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert report.results["cmd_priority"].status == px.TaskStatus.SUCCESS
# cmd 应该被执行,而不是 fn
assert report.results["cmd_priority"].value is None
def test_taskspec_callable_cmd():
"""测试 cmd 参数使用可调用对象."""
def my_callable():
return "callable result"
graph = px.Graph.from_specs(
[
px.TaskSpec("callable_cmd", cmd=my_callable),
]
)
report = px.run(graph, strategy="sequential")
assert report.success
assert report.results["callable_cmd"].status == px.TaskStatus.SUCCESS
assert report.results["callable_cmd"].value == "callable result"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Generated
+1 -1
View File
@@ -2193,7 +2193,7 @@ wheels = [
[[package]] [[package]]
name = "pyflowx" name = "pyflowx"
version = "0.1.1" version = "0.1.2"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" }, { name = "graphlib-backport", marker = "python_full_version < '3.9'" },