diff --git a/src/pyflowx/cli/emlmanager.py b/src/pyflowx/cli/emlmanager.py index 110734d..a4ac192 100644 --- a/src/pyflowx/cli/emlmanager.py +++ b/src/pyflowx/cli/emlmanager.py @@ -11,6 +11,7 @@ import email import gzip import hashlib import json +import logging import sqlite3 import threading from datetime import datetime @@ -28,6 +29,11 @@ from urllib.parse import parse_qs, urlparse DB_NAME = "eml_manager.db" TABLE_NAME = "emails" DEFAULT_PORT = 8080 +MAX_BODY_LENGTH = 5000 # 邮件正文最大长度 + +# 配置日志 +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) # ============================================================================ @@ -216,13 +222,126 @@ def decode_mime_words(s: str) -> str: return decoded_string +def _parse_email_date(date_str: str) -> str: + """解析邮件日期.""" + if not date_str: + return "" + + try: + dt = parsedate_to_datetime(date_str) + return dt.isoformat() + except Exception: + return date_str + + +def _extract_email_body_part(part: Any) -> str: + """提取邮件正文部分. + + Args: + part: 邮件部分对象 + + Returns: + 解码后的正文内容,限制在MAX_BODY_LENGTH长度内 + + Note: + - 自动检测字符编码,默认UTF-8 + - 使用errors="replace"处理解码错误,避免丢失信息 + - 捕获并记录所有异常,确保程序稳定性 + """ + try: + # 解码邮件内容(自动处理Base64、Quoted-Printable等编码) + payload = part.get_payload(decode=True) + if not payload: + logger.warning("邮件部分无有效内容") + return "" + + # 检测字符编码,优先使用邮件指定的编码 + charset = part.get_content_charset() + if not charset: + # 尝试从Content-Type header中提取 + charset = "utf-8" + logger.debug("未检测到字符编码,使用默认UTF-8") + + # 解码并处理错误 + try: + decoded_text = payload.decode(charset, errors="replace") + except (UnicodeDecodeError, LookupError) as decode_error: + # 如果指定编码失败,尝试常见编码 + logger.warning(f"字符编码 {charset} 解码失败: {decode_error}") + for fallback_charset in ["utf-8", "gbk", "gb2312", "latin-1"]: + try: + decoded_text = payload.decode(fallback_charset, errors="replace") + logger.info(f"成功使用备用编码 {fallback_charset} 解码") + break + except (UnicodeDecodeError, LookupError): + continue + else: + # 所有编码都失败,使用latin-1(不会失败) + decoded_text = payload.decode("latin-1", errors="replace") + logger.warning("使用latin-1编码强制解码") + + # 限制长度并返回 + result = decoded_text[:MAX_BODY_LENGTH] + if len(decoded_text) > MAX_BODY_LENGTH: + logger.debug(f"正文内容过长,截取前{MAX_BODY_LENGTH}字符") + + return result + + except AttributeError as attr_error: + logger.error(f"邮件部分对象属性错误: {attr_error}") + return "" + except Exception as unexpected_error: + logger.error(f"提取邮件正文时发生未知错误: {unexpected_error}") + return "" + + +def _process_multipart_email(msg: Any) -> tuple[str, str, int]: + """处理多部分邮件.""" + body_text = "" + body_html = "" + has_attachments = 0 + + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + # 检查附件 + if "attachment" in content_disposition: + has_attachments = 1 + continue + + # 提取正文 + if content_type == "text/plain" and not body_text: + body_text = _extract_email_body_part(part) + elif content_type == "text/html" and not body_html: + body_html = _extract_email_body_part(part) + + return body_text, body_html, has_attachments + + +def _process_singlepart_email(msg: Any) -> tuple[str, str]: + """处理单部分邮件.""" + body_text = "" + body_html = "" + + content_type = msg.get_content_type() + payload_text = _extract_email_body_part(msg) + + if content_type == "text/plain": + body_text = payload_text + elif content_type == "text/html": + body_html = payload_text + + return body_text, body_html + + def parse_eml_file(file_path: Path) -> dict[str, Any] | None: """解析 EML 文件.""" try: with open(file_path, "rb") as f: msg = email.message_from_binary_file(f) - # 计算文件哈希 + # 计算文件哈希和大小 file_hash = hashlib.md5(file_path.read_bytes()).hexdigest() file_size = file_path.stat().st_size @@ -231,57 +350,14 @@ def parse_eml_file(file_path: Path) -> dict[str, Any] | None: sender = decode_mime_words(msg.get("From", "")) recipients = decode_mime_words(msg.get("To", "")) date_str = msg.get("Date", "") - - # 解析日期 - date_parsed = "" - if date_str: - try: - dt = parsedate_to_datetime(date_str) - date_parsed = dt.isoformat() - except Exception: - date_parsed = date_str + date_parsed = _parse_email_date(date_str) # 提取正文 - body_text = "" - body_html = "" - has_attachments = 0 - if msg.is_multipart(): - for part in msg.walk(): - content_type = part.get_content_type() - content_disposition = str(part.get("Content-Disposition", "")) - - # 检查附件 - if "attachment" in content_disposition: - has_attachments = 1 - continue - - # 提取正文 - if content_type == "text/plain" and not body_text: - try: - payload = part.get_payload(decode=True) - charset = part.get_content_charset() or "utf-8" - body_text = payload.decode(charset, errors="ignore") - except Exception: - pass - elif content_type == "text/html" and not body_html: - try: - payload = part.get_payload(decode=True) - charset = part.get_content_charset() or "utf-8" - body_html = payload.decode(charset, errors="ignore") - except Exception: - pass + body_text, body_html, has_attachments = _process_multipart_email(msg) else: - content_type = msg.get_content_type() - try: - payload = msg.get_payload(decode=True) - charset = msg.get_content_charset() or "utf-8" - if content_type == "text/plain": - body_text = payload.decode(charset, errors="ignore") - elif content_type == "text/html": - body_html = payload.decode(charset, errors="ignore") - except Exception: - pass + body_text, body_html = _process_singlepart_email(msg) + has_attachments = 0 return { "file_path": str(file_path), @@ -291,8 +367,8 @@ def parse_eml_file(file_path: Path) -> dict[str, Any] | None: "recipients": recipients, "date": date_str, "date_parsed": date_parsed, - "body_text": body_text[:5000], # 限制长度 - "body_html": body_html[:5000], + "body_text": body_text, + "body_html": body_html, "has_attachments": has_attachments, "file_size": file_size, } diff --git a/tests/cli/test_envrs.py b/tests/cli/test_envrs.py index 0a2fc2d..c3f4d93 100644 --- a/tests/cli/test_envrs.py +++ b/tests/cli/test_envrs.py @@ -60,9 +60,9 @@ class TestSetRustMirror: assert cargo_dir.exists() assert cargo_dir.is_dir() - def test_set_rust_mirror_prints_message(self, capsys: pytest.CaptureFixture[str]) -> None: + def test_set_rust_mirror_prints_message(self, tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: """Should print mirror name.""" - with patch.object(Path, "home", return_value=Path("/tmp")): + with patch.object(Path, "home", return_value=tmp_path): envrs.set_rust_mirror("aliyun") captured = capsys.readouterr() assert "已设置 Rust 镜像源: aliyun" in captured.out