refactor: 整理代码格式与项目结构,修复命令检查bug

1. 重构多处列表展开写法,统一代码格式风格
2. 修复executors.py中命令不存在时的类型判断bug
3. 删除废弃的envlinux.py并替换为envdev.py,更新CLI入口配置
4. 为storage.py的后端方法添加override装饰器
5. 移除空的cli/__init__.py冗余导入
6. 更新pyproject.toml依赖与配置项
7. 精简测试用例代码
This commit is contained in:
2026-06-26 21:45:06 +08:00
parent 6f64d9d6dc
commit fd282db28f
17 changed files with 229 additions and 286 deletions
+7 -4
View File
@@ -10,7 +10,10 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = ["graphlib_backport >= 1.0.0; python_version < '3.9'"]
dependencies = [
"graphlib_backport >= 1.0.0; python_version < '3.9'",
"typing-extensions>=4.13.2",
]
description = "Lightweight, type-safe DAG task scheduler with multi-strategy execution."
keywords = ["async", "dag", "scheduler", "task", "workflow"]
license = { text = "MIT" }
@@ -22,9 +25,9 @@ version = "0.2.3"
[project.scripts]
autofmt = "pyflowx.cli.autofmt:main"
bumpversion = "pyflowx.cli.bumpversion:main"
clr = "pyflowx.cli.clearscreen:main"
cls = "pyflowx.cli.clearscreen:main"
emlman = "pyflowx.cli.emlmanager:main"
envlinux = "pyflowx.cli.envlinux:main"
envdev = "pyflowx.cli.envdev:main"
envpy = "pyflowx.cli.envpy:main"
envqt = "pyflowx.cli.envqt:main"
envrs = "pyflowx.cli.envrs:main"
@@ -146,6 +149,6 @@ select = [
"**/tests/**" = ["ARG001", "ARG002"]
[tool.pyrefly]
preset = "basic"
preset = "strict"
project-includes = ["**/*.ipynb", "**/*.py*"]
python-version = "3.8"
-78
View File
@@ -1,78 +0,0 @@
"""CLI 工具模块.
提供各种命令行工具的入口点.
"""
from __future__ import annotations
# 自动格式化工具
from pyflowx.cli.autofmt import main as autofmt_main
from pyflowx.cli.bumpversion import main as bumpversion_main
from pyflowx.cli.clearscreen import main as clearscreen_main
# EML 邮件管理工具
from pyflowx.cli.emlmanager import main as emlmanager_main
# EML 邮件管理工具
from pyflowx.cli.emlmanager import main as emlmanager_web_main
from pyflowx.cli.envpy import main as envpy_main
from pyflowx.cli.envqt import main as envqt_main
from pyflowx.cli.envrs import main as envrs_main
# 文件工具
from pyflowx.cli.filedate import main as filedate_main
from pyflowx.cli.filelevel import main as filelevel_main
from pyflowx.cli.folderback import main as folderback_main
from pyflowx.cli.folderzip import main as folderzip_main
# Git 工具
from pyflowx.cli.gittool import main as gittool_main
# 仿真工具
from pyflowx.cli.lscalc import main as lscalc_main
# 打包工具
from pyflowx.cli.packtool import main as packtool_main
# PDF 工具
from pyflowx.cli.pdftool import main as pdftool_main
# 开发工具
from pyflowx.cli.piptool import main as piptool_main
from pyflowx.cli.pymake import main as pymake_main
from pyflowx.cli.screenshot import main as screenshot_main
from pyflowx.cli.sshcopyid import main as sshcopyid_main
__all__ = [
# 自动格式化工具
"autofmt_main",
"bumpversion_main",
"clearscreen_main",
# EML 邮件管理工具
"emlmanager_main",
"emlmanager_web_main",
"envpy_main",
"envqt_main",
"envrs_main",
# 文件工具
"filedate_main",
"filelevel_main",
"folderback_main",
"folderzip_main",
# Git 工具
"gittool_main",
# 仿真工具
"lscalc_main",
# 打包工具
"packtool_main",
# PDF 工具
"pdftool_main",
# 开发工具
"piptool_main",
"pymake_main",
"screenshot_main",
"sshcopyid_main",
# 系统工具
"taskkill_main",
"which_main",
]
+6 -6
View File
@@ -268,13 +268,13 @@ def main() -> None:
cmd.extend(["--fix", "--unsafe-fixes"])
graph = px.Graph.from_specs([px.TaskSpec("ruff_check", cmd=cmd, verbose=True)])
elif args.command == "doc":
graph = px.Graph.from_specs(
[px.TaskSpec("auto_docstring", fn=auto_add_docstrings, args=(Path(args.root_dir),), verbose=True)]
)
graph = px.Graph.from_specs([
px.TaskSpec("auto_docstring", fn=auto_add_docstrings, args=(Path(args.root_dir),), verbose=True)
])
elif args.command == "sync":
graph = px.Graph.from_specs(
[px.TaskSpec("sync_config", fn=sync_pyproject_config, args=(Path(args.root_dir),), verbose=True)]
)
graph = px.Graph.from_specs([
px.TaskSpec("sync_config", fn=sync_pyproject_config, args=(Path(args.root_dir),), verbose=True)
])
else:
parser.print_help()
return
+23 -23
View File
@@ -212,16 +212,14 @@ def main() -> None:
# 更新所有文件的版本号(使用顺序执行避免竞争条件)
# 使用相对于 cwd 的路径作为任务名,确保唯一性
graph = px.Graph.from_specs(
[
px.TaskSpec(
f"bump_{file.relative_to(Path.cwd())}".replace("\\", "_").replace("/", "_").replace(".", "_"),
fn=bump_file_version,
args=(file, part),
)
for file in all_files
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
f"bump_{file.relative_to(Path.cwd())}".replace("\\", "_").replace("/", "_").replace(".", "_"),
fn=bump_file_version,
args=(file, part),
)
for file in all_files
])
report = px.run(graph, strategy="sequential")
# 收集新版本号(取第一个成功的结果)
@@ -239,23 +237,25 @@ def main() -> None:
print(f"版本号已更新为: {new_version}")
# 提交修改
graph = px.Graph.from_specs(
[
px.TaskSpec("git_add", cmd=["git", "add", "."]),
px.TaskSpec(
"git_commit", cmd=["git", "commit", "-m", f"bump version to {new_version}"], depends_on=["git_add"]
),
]
)
graph = px.Graph.from_specs([
px.TaskSpec("git_add", cmd=["git", "add", "."]),
px.TaskSpec(
"git_commit",
cmd=["git", "commit", "-m", f"bump version to {new_version}"],
depends_on=("git_add",),
),
])
px.run(graph, strategy="sequential")
# 创建 git tag
if not args.no_tag:
tag_name = f"v{new_version}"
graph = px.Graph.from_specs(
[
px.TaskSpec("git_tag", cmd=["git", "tag", "-a", tag_name, "-m", f"Release {tag_name}"]),
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"git_tag",
cmd=["git", "tag", "-a", tag_name, "-m", f"Release {tag_name}"],
depends_on=("git_commit",),
),
])
px.run(graph, strategy="sequential")
print(f"已创建标签: {tag_name}")
+5 -13
View File
@@ -5,23 +5,15 @@
from __future__ import annotations
import subprocess
import pyflowx as px
from pyflowx.conditions import Constants
def clear_screen() -> None:
"""使用系统命令清屏."""
if Constants.IS_WINDOWS:
subprocess.run(["cmd", "/c", "cls"], check=False)
else:
subprocess.run(["clear"], check=False)
print("\033[2J\033[H", end="")
def main() -> None:
"""清屏工具主函数."""
graph = px.Graph.from_specs([px.TaskSpec("clearscreen", fn=clear_screen)])
graph = px.Graph.from_specs([
px.TaskSpec("cls_win", cmd=["cmd", "/c", "cls"], conditions=(lambda: Constants.IS_WINDOWS,)),
px.TaskSpec("cls_unix", cmd=["clear"], conditions=(lambda: not Constants.IS_WINDOWS,)),
px.TaskSpec("cls_ascii", fn=lambda: print("\033[2J\033[H", end="")),
])
px.run(graph, strategy="thread")
+59
View File
@@ -0,0 +1,59 @@
from typing import TypedDict
import pyflowx as px
class EnvConfig(TypedDict):
"""环境配置项."""
name: str
value: str
description: str
PIP_INDEX_URL_CONFIG: EnvConfig = {
"name": "PIP_INDEX_URL",
"value": "https://pypi.tuna.tsinghua.edu.cn/simple",
"description": "PIP索引URL",
}
# ============================================================================
# 配置
# ============================================================================
PIP_INDEX_URLS: dict[str, str] = {
"tsinghua": "https://pypi.tuna.tsinghua.edu.cn/simple",
"aliyun": "https://mirrors.aliyun.com/pypi/simple/",
}
PIP_TRUSTED_HOSTS: dict[str, str] = {
"tsinghua": "pypi.tuna.tsinghua.edu.cn",
"aliyun": "mirrors.aliyun.com",
}
UV_INDEX_URL: str = "https://mirrors.aliyun.com/pypi/simple/"
UV_PYTHON_INSTALL_MIRROR: str = "https://registry.npmmirror.com/-/binary/python-build-standalone"
CONDA_MIRROR_URLS: dict[str, list[str]] = {
"tsinghua": [
"https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/",
"https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/",
"https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/",
],
"aliyun": [
"https://mirrors.aliyun.com/anaconda/pkgs/main/",
"https://mirrors.aliyun.com/anaconda/pkgs/free/",
"https://mirrors.aliyun.com/anaconda/cloud/conda-forge/",
],
}
def main() -> None:
"""主函数."""
# 使用更安全的分步执行方式,便于调试和捕获错误
graph = px.Graph.from_specs([
px.TaskSpec("download", cmd="curl -sSL https://linuxmirrors.cn/main.sh -o /tmp/linuxmirrors.sh", verbose=True),
px.TaskSpec("install", cmd="sudo bash /tmp/linuxmirrors.sh", verbose=True, depends_on=("download",)),
])
px.run(graph, strategy="thread")
-11
View File
@@ -1,11 +0,0 @@
import pyflowx as px
def main() -> None:
"""主函数."""
# 使用更安全的分步执行方式,便于调试和捕获错误
graph = px.Graph.from_specs([
px.TaskSpec("download", cmd="curl -sSL https://linuxmirrors.cn/main.sh -o /tmp/linuxmirrors.sh", verbose=True),
px.TaskSpec("install", cmd="sudo bash /tmp/linuxmirrors.sh", verbose=True, depends_on=("download",)),
])
px.run(graph, strategy="thread")
+16 -20
View File
@@ -113,27 +113,23 @@ def main() -> None:
args = parser.parse_args()
if args.command == "add":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": False},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": False},
)
])
elif args.command == "clear":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": True},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"process_files_date",
fn=process_files_date,
args=([Path(f) for f in args.files],),
kwargs={"clear": True},
)
])
else:
parser.print_help()
return
+59 -67
View File
@@ -436,87 +436,79 @@ def main() -> None: # noqa: PLR0912
args = parser.parse_args()
if args.command == "m":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_merge", fn=pdf_merge, args=([Path(p) for p in args.inputs], Path(args.output)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_merge", fn=pdf_merge, args=([Path(p) for p in args.inputs], Path(args.output)))
])
elif args.command == "s":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_split", fn=pdf_split, args=(Path(args.input), Path(args.output_dir)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_split", fn=pdf_split, args=(Path(args.input), Path(args.output_dir)))
])
elif args.command == "c":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_compress", fn=pdf_compress, args=(Path(args.input), Path(args.output)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_compress", fn=pdf_compress, args=(Path(args.input), Path(args.output)))
])
elif args.command == "e":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_encrypt", fn=pdf_encrypt, args=(Path(args.input), Path(args.output), args.password))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_encrypt", fn=pdf_encrypt, args=(Path(args.input), Path(args.output), args.password))
])
elif args.command == "d":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_decrypt", fn=pdf_decrypt, args=(Path(args.input), Path(args.output), args.password))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_decrypt", fn=pdf_decrypt, args=(Path(args.input), Path(args.output), args.password))
])
elif args.command == "xt":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_extract_text", fn=pdf_extract_text, args=(Path(args.input), Path(args.output)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_extract_text", fn=pdf_extract_text, args=(Path(args.input), Path(args.output)))
])
elif args.command == "xi":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_extract_images", fn=pdf_extract_images, args=(Path(args.input), Path(args.output_dir)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_extract_images", fn=pdf_extract_images, args=(Path(args.input), Path(args.output_dir)))
])
elif args.command == "w":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"pdf_watermark",
fn=pdf_add_watermark,
args=(Path(args.input), Path(args.output)),
kwargs={"text": args.text},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_watermark",
fn=pdf_add_watermark,
args=(Path(args.input), Path(args.output)),
kwargs={"text": args.text},
)
])
elif args.command == "r":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"pdf_rotate",
fn=pdf_rotate,
args=(Path(args.input), Path(args.output)),
kwargs={"rotation": args.rotation},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_rotate",
fn=pdf_rotate,
args=(Path(args.input), Path(args.output)),
kwargs={"rotation": args.rotation},
)
])
elif args.command == "crop":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"pdf_crop",
fn=pdf_crop,
args=(Path(args.input), Path(args.output)),
kwargs={"margins": (args.left, args.top, args.right, args.bottom)},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_crop",
fn=pdf_crop,
args=(Path(args.input), Path(args.output)),
kwargs={"margins": (args.left, args.top, args.right, args.bottom)},
)
])
elif args.command == "i":
graph = px.Graph.from_specs([px.TaskSpec("pdf_info", fn=pdf_info, args=(Path(args.input),))])
elif args.command == "ocr":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_ocr", fn=pdf_ocr, args=(Path(args.input), Path(args.output)), kwargs={"lang": args.lang})]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_ocr", fn=pdf_ocr, args=(Path(args.input), Path(args.output)), kwargs={"lang": args.lang})
])
elif args.command == "img":
graph = px.Graph.from_specs(
[
px.TaskSpec(
"pdf_to_images",
fn=pdf_to_images,
args=(Path(args.input), Path(args.output_dir)),
kwargs={"dpi": args.dpi},
)
]
)
graph = px.Graph.from_specs([
px.TaskSpec(
"pdf_to_images",
fn=pdf_to_images,
args=(Path(args.input), Path(args.output_dir)),
kwargs={"dpi": args.dpi},
)
])
elif args.command == "repair":
graph = px.Graph.from_specs(
[px.TaskSpec("pdf_repair", fn=pdf_repair, args=(Path(args.input), Path(args.output)))]
)
graph = px.Graph.from_specs([
px.TaskSpec("pdf_repair", fn=pdf_repair, args=(Path(args.input), Path(args.output)))
])
else:
parser.print_help()
return
+6 -8
View File
@@ -31,14 +31,12 @@ def aggregate(ctx: px.Context) -> dict[str, Any]:
def main() -> None:
graph = px.Graph.from_specs(
[
# Static positional args parameterise the same function twice.
px.TaskSpec("fetch_user", fetch_user, args=(1,)),
px.TaskSpec("fetch_posts", fetch_posts, args=(1,)),
px.TaskSpec("aggregate", aggregate, depends_on=("fetch_user", "fetch_posts")),
]
)
graph = px.Graph.from_specs([
# Static positional args parameterise the same function twice.
px.TaskSpec("fetch_user", fetch_user, args=(1,)),
px.TaskSpec("fetch_posts", fetch_posts, args=(1,)),
px.TaskSpec("aggregate", aggregate, depends_on=("fetch_user", "fetch_posts")),
])
print("=== Dry run ===")
_ = px.run(graph, strategy="async", dry_run=True)
+19 -19
View File
@@ -10,19 +10,21 @@ Demonstrates the core PyFlowX workflow:
from __future__ import annotations
from typing import Any
import pyflowx as px
# --- task functions: pure, testable, no framework coupling ------------- #
def extract_customers() -> list[dict]:
def extract_customers() -> list[dict[str, Any]]:
return [
{"id": "C001", "name": "Alice"},
{"id": "C002", "name": "Bob"},
]
def extract_orders() -> list[dict]:
def extract_orders() -> list[dict[str, Any]]:
return [
{"id": "O001", "customer_id": "C001", "amount": 150.0},
{"id": "O002", "customer_id": "C002", "amount": 200.5},
@@ -31,32 +33,30 @@ def extract_orders() -> list[dict]:
# Parameter names match dependency names → automatic injection.
def transform(
extract_customers: list[dict],
extract_orders: list[dict],
) -> list[dict]:
extract_customers: list[dict[str, Any]],
extract_orders: list[dict[str, Any]],
) -> list[dict[str, Any]]:
cmap = {c["id"]: c for c in extract_customers}
return [{**o, "customer_name": cmap[o["customer_id"]]["name"]} for o in extract_orders if o["customer_id"] in cmap]
def load(transform: list[dict]) -> int:
def load(transform: list[dict[str, Any]]) -> int:
print(f" loaded {len(transform)} records")
return len(transform)
def main() -> None:
graph = px.Graph.from_specs(
[
px.TaskSpec("extract_customers", extract_customers, tags=("extract",)),
px.TaskSpec("extract_orders", extract_orders, tags=("extract",)),
px.TaskSpec(
"transform",
transform,
depends_on=("extract_customers", "extract_orders"),
tags=("transform",),
),
px.TaskSpec("load", load, depends_on=("transform",), retries=1, tags=("load",)),
]
)
graph = px.Graph.from_specs([
px.TaskSpec("extract_customers", extract_customers, tags=("extract",)),
px.TaskSpec("extract_orders", extract_orders, tags=("extract",)),
px.TaskSpec(
"transform",
transform,
depends_on=("extract_customers", "extract_orders"),
tags=("transform",),
),
px.TaskSpec("load", load, depends_on=("transform",), retries=1, tags=("load",)),
])
print("=== Execution plan ===")
print(graph.describe())
+5 -7
View File
@@ -29,13 +29,11 @@ def merge(fetch_a: str, fetch_b: str) -> str:
def main() -> None:
graph = px.Graph.from_specs(
[
px.TaskSpec("fetch_a", fetch_a),
px.TaskSpec("fetch_b", fetch_b),
px.TaskSpec("merge", merge, depends_on=("fetch_a", "fetch_b")),
]
)
graph = px.Graph.from_specs([
px.TaskSpec("fetch_a", fetch_a),
px.TaskSpec("fetch_b", fetch_b),
px.TaskSpec("merge", merge, depends_on=("fetch_a", "fetch_b")),
])
print("=== Mermaid diagram ===")
print(graph.to_mermaid("LR"))
+3 -1
View File
@@ -132,7 +132,9 @@ def _check_conditions_for_skip(
if failed_conditions:
return f"条件不满足: {', '.join(failed_conditions)}"
elif spec.skip_if_missing and not spec._is_cmd_available():
return f"命令不存在: {spec.cmd[0] if spec.cmd else 'unknown'}"
# _is_cmd_available() 仅对 list[str] 类型返回 False
cmd_name = spec.cmd[0] if isinstance(spec.cmd, list) and spec.cmd else "unknown"
return f"命令不存在: {cmd_name}"
else:
return "条件不满足"
+12
View File
@@ -21,6 +21,8 @@ from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Mapping
from typing_extensions import override
from .errors import StorageError
@@ -54,18 +56,23 @@ class MemoryBackend(StateBackend):
def __init__(self) -> None:
self._store: dict[str, Any] = {}
@override
def load(self) -> Mapping[str, Any]:
return dict(self._store)
@override
def save(self, name: str, value: Any) -> None:
self._store[name] = value
@override
def has(self, name: str) -> bool:
return name in self._store
@override
def get(self, name: str) -> Any:
return self._store[name]
@override
def clear(self) -> None:
self._store.clear()
@@ -104,9 +111,11 @@ class JSONBackend(StateBackend):
except (OSError, TypeError) as exc:
raise StorageError(f"cannot write state file {self._path!r}", exc) from exc
@override
def load(self) -> Mapping[str, Any]:
return dict(self._store)
@override
def save(self, name: str, value: Any) -> None:
# 在修改内存状态前先校验可序列化性。
try:
@@ -116,12 +125,15 @@ class JSONBackend(StateBackend):
self._store[name] = value
self._flush()
@override
def has(self, name: str) -> bool:
return name in self._store
@override
def get(self, name: str) -> Any:
return self._store[name]
@override
def clear(self) -> None:
self._store.clear()
self._flush()
+3 -4
View File
@@ -174,19 +174,18 @@ class TaskSpec(Generic[T]):
verbose = self.verbose
if isinstance(cmd, list):
cmd_list = cast(List[str], cmd)
def _run_list() -> T:
import subprocess
cmd_str = " ".join(str(arg) for arg in cmd_list)
cmd_str = " ".join(arg for arg in cmd)
if verbose:
print(f"[verbose] 执行命令: {cmd_str}", flush=True)
if cwd is not None:
print(f"[verbose] 工作目录: {cwd}", flush=True)
try:
result = subprocess.run(
cmd_list,
cmd,
cwd=cwd,
timeout=timeout,
capture_output=not verbose,
@@ -288,7 +287,7 @@ class TaskSpec(Generic[T]):
cmd = self.cmd
if isinstance(cmd, list) and cmd:
first_arg = cast(str, cmd[0])
first_arg = cmd[0]
return shutil.which(first_arg) is not None
return True
+1 -24
View File
@@ -2,33 +2,10 @@
from __future__ import annotations
from unittest.mock import MagicMock, patch
from unittest.mock import patch
import pyflowx as px
from pyflowx.cli import clearscreen
from pyflowx.conditions import Constants
# ---------------------------------------------------------------------- #
# clear_screen
# ---------------------------------------------------------------------- #
class TestClearScreen:
"""Test clear_screen function."""
def test_clear_screen_windows(self) -> None:
"""Should clear screen on Windows."""
if Constants.IS_WINDOWS:
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
clearscreen.clear_screen()
assert mock_run.called
def test_clear_screen_linux(self) -> None:
"""Should clear screen on Linux."""
with patch.object(Constants, "IS_WINDOWS", False), patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
clearscreen.clear_screen()
assert mock_run.called
# ---------------------------------------------------------------------- #
Generated
+5 -1
View File
@@ -2184,10 +2184,12 @@ wheels = [
[[package]]
name = "pyflowx"
version = "0.2.2"
version = "0.2.3"
source = { editable = "." }
dependencies = [
{ name = "graphlib-backport", marker = "python_full_version < '3.9'" },
{ name = "typing-extensions", version = "4.13.2", source = { registry = "https://mirrors.aliyun.com/pypi/simple/" }, marker = "python_full_version < '3.9'" },
{ name = "typing-extensions", version = "4.15.0", source = { registry = "https://mirrors.aliyun.com/pypi/simple/" }, marker = "python_full_version >= '3.9'" },
]
[package.optional-dependencies]
@@ -2257,6 +2259,7 @@ requires-dist = [
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.0" },
{ name = "tox", marker = "extra == 'dev'", specifier = ">=4.25.0" },
{ name = "tox-uv", marker = "extra == 'dev'", specifier = ">=1.13.1" },
{ name = "typing-extensions", specifier = ">=4.13.2" },
]
provides-extras = ["dev", "office"]
@@ -3179,6 +3182,7 @@ name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://mirrors.aliyun.com/pypi/simple/" }
resolution-markers = [
"python_full_version >= '3.15'",
"python_full_version >= '3.10' and python_full_version < '3.15'",
"python_full_version > '3.9' and python_full_version < '3.10'",
"python_full_version == '3.9'",