暂无邮件
+请先打开目录导入邮件
+邮件详情
+请选择左侧邮件查看详情
+diff --git a/pyproject.toml b/pyproject.toml index 26f30e8..790741a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ version = "0.1.8" autofmt = "pyflowx.cli.autofmt:main" bumpver = "pyflowx.cli.bumpversion:main" clr = "pyflowx.cli.clearscreen:main" +emlman = "pyflowx.cli.emlmanager:main" envpy = "pyflowx.cli.envpy:main" envqt = "pyflowx.cli.envqt:main" envrs = "pyflowx.cli.envrs:main" diff --git a/src/pyflowx/cli/__init__.py b/src/pyflowx/cli/__init__.py index e74def8..278ad14 100644 --- a/src/pyflowx/cli/__init__.py +++ b/src/pyflowx/cli/__init__.py @@ -9,6 +9,12 @@ from __future__ import annotations from pyflowx.cli.autofmt import main as autofmt_main from pyflowx.cli.bumpversion import main as bumpversion_main from pyflowx.cli.clearscreen import main as clearscreen_main + +# EML 邮件管理工具 +from pyflowx.cli.emlmanager import main as emlmanager_main + +# EML 邮件管理工具 +from pyflowx.cli.emlmanager import main as emlmanager_web_main from pyflowx.cli.envpy import main as envpy_main from pyflowx.cli.envqt import main as envqt_main from pyflowx.cli.envrs import main as envrs_main @@ -37,15 +43,14 @@ from pyflowx.cli.pymake import main as pymake_main from pyflowx.cli.screenshot import main as screenshot_main from pyflowx.cli.sshcopyid import main as sshcopyid_main -# 系统工具 -from pyflowx.cli.taskkill import main as taskkill_main -from pyflowx.cli.which import main as which_main - __all__ = [ # 自动格式化工具 "autofmt_main", "bumpversion_main", "clearscreen_main", + # EML 邮件管理工具 + "emlmanager_main", + "emlmanager_web_main", "envpy_main", "envqt_main", "envrs_main", diff --git a/src/pyflowx/cli/emlmanager.py b/src/pyflowx/cli/emlmanager.py new file mode 100644 index 0000000..361bf19 --- /dev/null +++ b/src/pyflowx/cli/emlmanager.py @@ -0,0 +1,1189 @@ +"""EML 邮件管理工具 Web 版本. + +提供基于 Web 的 EML 邮件文件管理功能, +支持邮件读取、数据库存储、搜索和聚合显示. +""" + +from __future__ import annotations + +import argparse +import email +import hashlib +import json +import sqlite3 +import threading +from datetime import datetime +from email.header import decode_header +from email.utils import parsedate_to_datetime +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from typing import Any +from urllib.parse import parse_qs, urlparse + +# ============================================================================ +# 配置 +# ============================================================================ + +DB_NAME = "eml_manager.db" +TABLE_NAME = "emails" +DEFAULT_PORT = 8080 + + +# ============================================================================ +# 数据库管理 +# ============================================================================ + + +class EmailDatabase: + """邮件数据库管理类.""" + + def __init__(self, db_path: Path): + """初始化数据库连接.""" + self.db_path = db_path + self.conn: sqlite3.Connection | None = None + self._lock = threading.Lock() + self._init_db() + + def _init_db(self) -> None: + """初始化数据库表结构.""" + self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + cursor = self.conn.cursor() + + # 创建邮件表 + cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + file_path TEXT UNIQUE NOT NULL, + file_hash TEXT NOT NULL, + subject TEXT, + sender TEXT, + recipients TEXT, + date TEXT, + date_parsed TEXT, + body_text TEXT, + body_html TEXT, + has_attachments INTEGER DEFAULT 0, + file_size INTEGER, + created_at TEXT, + updated_at TEXT + ) + """ + ) + + # 创建索引 + cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_subject ON {TABLE_NAME}(subject)") + cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_sender ON {TABLE_NAME}(sender)") + cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_date ON {TABLE_NAME}(date_parsed)") + cursor.execute(f"CREATE INDEX IF NOT EXISTS idx_file_hash ON {TABLE_NAME}(file_hash)") + + self.conn.commit() + + def insert_email(self, email_data: dict[str, Any]) -> bool: + """插入邮件数据.""" + try: + with self._lock: + cursor = self.conn.cursor() + cursor.execute( + f""" + INSERT OR REPLACE INTO {TABLE_NAME} + (file_path, file_hash, subject, sender, recipients, date, date_parsed, + body_text, body_html, has_attachments, file_size, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + email_data["file_path"], + email_data["file_hash"], + email_data.get("subject", ""), + email_data.get("sender", ""), + email_data.get("recipients", ""), + email_data.get("date", ""), + email_data.get("date_parsed", ""), + email_data.get("body_text", ""), + email_data.get("body_html", ""), + email_data.get("has_attachments", 0), + email_data.get("file_size", 0), + datetime.now().isoformat(), + datetime.now().isoformat(), + ), + ) + self.conn.commit() + return True + except sqlite3.Error: + return False + + def search_emails(self, keyword: str = "", field: str = "all") -> list[dict[str, Any]]: + """搜索邮件.""" + with self._lock: + cursor = self.conn.cursor() + + if not keyword: + cursor.execute(f"SELECT * FROM {TABLE_NAME} ORDER BY date_parsed DESC") + elif field == "subject": + query = f"SELECT * FROM {TABLE_NAME} WHERE subject LIKE ? ORDER BY date_parsed DESC" + cursor.execute(query, (f"%{keyword}%",)) + elif field == "sender": + query = f"SELECT * FROM {TABLE_NAME} WHERE sender LIKE ? ORDER BY date_parsed DESC" + cursor.execute(query, (f"%{keyword}%",)) + elif field == "recipients": + query = f"SELECT * FROM {TABLE_NAME} WHERE recipients LIKE ? ORDER BY date_parsed DESC" + cursor.execute(query, (f"%{keyword}%",)) + else: # all + query = f""" + SELECT * FROM {TABLE_NAME} + WHERE subject LIKE ? OR sender LIKE ? OR recipients LIKE ? OR body_text LIKE ? + ORDER BY date_parsed DESC + """ + cursor.execute(query, (f"%{keyword}%", f"%{keyword}%", f"%{keyword}%", f"%{keyword}%")) + + columns = [description[0] for description in cursor.description] + return [dict(zip(columns, row)) for row in cursor.fetchall()] + + def get_grouped_emails(self) -> dict[str, list[dict[str, Any]]]: + """获取按主题分组的邮件.""" + with self._lock: + cursor = self.conn.cursor() + cursor.execute(f"SELECT * FROM {TABLE_NAME} ORDER BY subject, date_parsed DESC") + + columns = [description[0] for description in cursor.description] + emails = [dict(zip(columns, row)) for row in cursor.fetchall()] + + # 按主题分组 + grouped: dict[str, list[dict[str, Any]]] = {} + for email_data in emails: + subject = email_data.get("subject", "") or "(无主题)" + # 标准化主题(去除Re:、Fwd:等前缀) + normalized_subject = self._normalize_subject(subject) + if normalized_subject not in grouped: + grouped[normalized_subject] = [] + grouped[normalized_subject].append(email_data) + + return grouped + + def _normalize_subject(self, subject: str) -> str: + """标准化邮件主题.""" + import re + + # 移除 Re:, Fwd:, FW: 等前缀 + normalized = re.sub(r"^(Re|Fwd|FW|Fw):\s*", "", subject, flags=re.IGNORECASE) + return normalized.strip() + + def get_email_count(self) -> int: + """获取邮件总数.""" + with self._lock: + cursor = self.conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}") + return cursor.fetchone()[0] + + def clear_all(self) -> None: + """清空所有邮件数据.""" + with self._lock: + cursor = self.conn.cursor() + cursor.execute(f"DELETE FROM {TABLE_NAME}") + self.conn.commit() + + def close(self) -> None: + """关闭数据库连接.""" + if self.conn: + self.conn.close() + + +# ============================================================================ +# EML 文件解析 +# ============================================================================ + + +def decode_mime_words(s: str) -> str: + """解码 MIME 编码的字符串.""" + if not s: + return "" + + decoded_list = decode_header(s) + decoded_string = "" + for part, encoding in decoded_list: + if isinstance(part, bytes): + decoded_string += part.decode(encoding or "utf-8", errors="ignore") + else: + decoded_string += str(part) + + return decoded_string + + +def parse_eml_file(file_path: Path) -> dict[str, Any] | None: + """解析 EML 文件.""" + try: + with open(file_path, "rb") as f: + msg = email.message_from_binary_file(f) + + # 计算文件哈希 + file_hash = hashlib.md5(file_path.read_bytes()).hexdigest() + file_size = file_path.stat().st_size + + # 提取基本信息 + subject = decode_mime_words(msg.get("Subject", "")) + sender = decode_mime_words(msg.get("From", "")) + recipients = decode_mime_words(msg.get("To", "")) + date_str = msg.get("Date", "") + + # 解析日期 + date_parsed = "" + if date_str: + try: + dt = parsedate_to_datetime(date_str) + date_parsed = dt.isoformat() + except Exception: + date_parsed = date_str + + # 提取正文 + body_text = "" + body_html = "" + has_attachments = 0 + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + # 检查附件 + if "attachment" in content_disposition: + has_attachments = 1 + continue + + # 提取正文 + if content_type == "text/plain" and not body_text: + try: + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or "utf-8" + body_text = payload.decode(charset, errors="ignore") + except Exception: + pass + elif content_type == "text/html" and not body_html: + try: + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or "utf-8" + body_html = payload.decode(charset, errors="ignore") + except Exception: + pass + else: + content_type = msg.get_content_type() + try: + payload = msg.get_payload(decode=True) + charset = msg.get_content_charset() or "utf-8" + if content_type == "text/plain": + body_text = payload.decode(charset, errors="ignore") + elif content_type == "text/html": + body_html = payload.decode(charset, errors="ignore") + except Exception: + pass + + return { + "file_path": str(file_path), + "file_hash": file_hash, + "subject": subject, + "sender": sender, + "recipients": recipients, + "date": date_str, + "date_parsed": date_parsed, + "body_text": body_text[:5000], # 限制长度 + "body_html": body_html[:5000], + "has_attachments": has_attachments, + "file_size": file_size, + } + + except Exception as e: + print(f"解析文件失败 {file_path}: {e}") + return None + + +# ============================================================================ +# Web 服务器 +# ============================================================================ + + +class EmlManagerHandler(BaseHTTPRequestHandler): + """EML 邮件管理器 HTTP 请求处理器.""" + + db: EmailDatabase | None = None + work_dir: Path | None = None + + def do_GET(self) -> None: + """处理 GET 请求.""" + parsed_path = urlparse(self.path) + path = parsed_path.path + query_params = parse_qs(parsed_path.query) + + if path == "/" or path == "/index.html": + self._serve_index() + elif path == "/test": + self._serve_test_page() + elif path == "/api/emails": + self._api_get_emails(query_params) + elif path == "/api/email": + self._api_get_email(query_params) + elif path == "/api/grouped": + self._api_get_grouped_emails() + elif path == "/api/count": + self._api_get_count() + elif path == "/api/status": + self._api_get_status() + else: + self.send_error(404, "Not Found") + + def do_POST(self) -> None: + """处理 POST 请求.""" + parsed_path = urlparse(self.path) + path = parsed_path.path + + if path == "/api/import": + self._api_import_emails() + elif path == "/api/clear": + self._api_clear_database() + else: + self.send_error(404, "Not Found") + + def _serve_index(self) -> None: + """返回主页 HTML.""" + html_content = self._get_html_template() + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + self.wfile.write(html_content.encode("utf-8")) + + def _serve_test_page(self) -> None: + """返回测试页面 HTML.""" + test_html = """ + +
+ + +请先打开目录导入邮件
+请选择左侧邮件查看详情
+