refactor(emlmanager): 重构EML解析逻辑,提取公共方法并优化字符编码处理

1.  拆分邮件解析为多部分/单部分处理函数,抽离正文提取、日期解析逻辑
2.  完善字符编码检测与 fallback 处理,使用replace模式避免解码失败崩溃
3.  统一使用配置的最大正文长度限制,添加详细日志记录
4.  修复原代码中解码异常未妥善处理的问题
5.  优化测试用例,使用tmp_path替代固定临时目录提升测试稳定性
This commit is contained in:
2026-06-25 12:21:23 +08:00
parent 5c0f51e272
commit f8436f6b8c
2 changed files with 128 additions and 52 deletions
+126 -50
View File
@@ -11,6 +11,7 @@ import email
import gzip
import hashlib
import json
import logging
import sqlite3
import threading
from datetime import datetime
@@ -28,6 +29,11 @@ from urllib.parse import parse_qs, urlparse
DB_NAME = "eml_manager.db"
TABLE_NAME = "emails"
DEFAULT_PORT = 8080
MAX_BODY_LENGTH = 5000 # 邮件正文最大长度
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
# ============================================================================
@@ -216,13 +222,126 @@ def decode_mime_words(s: str) -> str:
return decoded_string
def _parse_email_date(date_str: str) -> str:
"""解析邮件日期."""
if not date_str:
return ""
try:
dt = parsedate_to_datetime(date_str)
return dt.isoformat()
except Exception:
return date_str
def _extract_email_body_part(part: Any) -> str:
"""提取邮件正文部分.
Args:
part: 邮件部分对象
Returns:
解码后的正文内容,限制在MAX_BODY_LENGTH长度内
Note:
- 自动检测字符编码,默认UTF-8
- 使用errors="replace"处理解码错误,避免丢失信息
- 捕获并记录所有异常,确保程序稳定性
"""
try:
# 解码邮件内容(自动处理Base64、Quoted-Printable等编码)
payload = part.get_payload(decode=True)
if not payload:
logger.warning("邮件部分无有效内容")
return ""
# 检测字符编码,优先使用邮件指定的编码
charset = part.get_content_charset()
if not charset:
# 尝试从Content-Type header中提取
charset = "utf-8"
logger.debug("未检测到字符编码,使用默认UTF-8")
# 解码并处理错误
try:
decoded_text = payload.decode(charset, errors="replace")
except (UnicodeDecodeError, LookupError) as decode_error:
# 如果指定编码失败,尝试常见编码
logger.warning(f"字符编码 {charset} 解码失败: {decode_error}")
for fallback_charset in ["utf-8", "gbk", "gb2312", "latin-1"]:
try:
decoded_text = payload.decode(fallback_charset, errors="replace")
logger.info(f"成功使用备用编码 {fallback_charset} 解码")
break
except (UnicodeDecodeError, LookupError):
continue
else:
# 所有编码都失败,使用latin-1(不会失败)
decoded_text = payload.decode("latin-1", errors="replace")
logger.warning("使用latin-1编码强制解码")
# 限制长度并返回
result = decoded_text[:MAX_BODY_LENGTH]
if len(decoded_text) > MAX_BODY_LENGTH:
logger.debug(f"正文内容过长,截取前{MAX_BODY_LENGTH}字符")
return result
except AttributeError as attr_error:
logger.error(f"邮件部分对象属性错误: {attr_error}")
return ""
except Exception as unexpected_error:
logger.error(f"提取邮件正文时发生未知错误: {unexpected_error}")
return ""
def _process_multipart_email(msg: Any) -> tuple[str, str, int]:
"""处理多部分邮件."""
body_text = ""
body_html = ""
has_attachments = 0
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# 检查附件
if "attachment" in content_disposition:
has_attachments = 1
continue
# 提取正文
if content_type == "text/plain" and not body_text:
body_text = _extract_email_body_part(part)
elif content_type == "text/html" and not body_html:
body_html = _extract_email_body_part(part)
return body_text, body_html, has_attachments
def _process_singlepart_email(msg: Any) -> tuple[str, str]:
"""处理单部分邮件."""
body_text = ""
body_html = ""
content_type = msg.get_content_type()
payload_text = _extract_email_body_part(msg)
if content_type == "text/plain":
body_text = payload_text
elif content_type == "text/html":
body_html = payload_text
return body_text, body_html
def parse_eml_file(file_path: Path) -> dict[str, Any] | None:
"""解析 EML 文件."""
try:
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f)
# 计算文件哈希
# 计算文件哈希和大小
file_hash = hashlib.md5(file_path.read_bytes()).hexdigest()
file_size = file_path.stat().st_size
@@ -231,57 +350,14 @@ def parse_eml_file(file_path: Path) -> dict[str, Any] | None:
sender = decode_mime_words(msg.get("From", ""))
recipients = decode_mime_words(msg.get("To", ""))
date_str = msg.get("Date", "")
# 解析日期
date_parsed = ""
if date_str:
try:
dt = parsedate_to_datetime(date_str)
date_parsed = dt.isoformat()
except Exception:
date_parsed = date_str
date_parsed = _parse_email_date(date_str)
# 提取正文
body_text = ""
body_html = ""
has_attachments = 0
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# 检查附件
if "attachment" in content_disposition:
has_attachments = 1
continue
# 提取正文
if content_type == "text/plain" and not body_text:
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
body_text = payload.decode(charset, errors="ignore")
except Exception:
pass
elif content_type == "text/html" and not body_html:
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
body_html = payload.decode(charset, errors="ignore")
except Exception:
pass
body_text, body_html, has_attachments = _process_multipart_email(msg)
else:
content_type = msg.get_content_type()
try:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
if content_type == "text/plain":
body_text = payload.decode(charset, errors="ignore")
elif content_type == "text/html":
body_html = payload.decode(charset, errors="ignore")
except Exception:
pass
body_text, body_html = _process_singlepart_email(msg)
has_attachments = 0
return {
"file_path": str(file_path),
@@ -291,8 +367,8 @@ def parse_eml_file(file_path: Path) -> dict[str, Any] | None:
"recipients": recipients,
"date": date_str,
"date_parsed": date_parsed,
"body_text": body_text[:5000], # 限制长度
"body_html": body_html[:5000],
"body_text": body_text,
"body_html": body_html,
"has_attachments": has_attachments,
"file_size": file_size,
}
+2 -2
View File
@@ -60,9 +60,9 @@ class TestSetRustMirror:
assert cargo_dir.exists()
assert cargo_dir.is_dir()
def test_set_rust_mirror_prints_message(self, capsys: pytest.CaptureFixture[str]) -> None:
def test_set_rust_mirror_prints_message(self, tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None:
"""Should print mirror name."""
with patch.object(Path, "home", return_value=Path("/tmp")):
with patch.object(Path, "home", return_value=tmp_path):
envrs.set_rust_mirror("aliyun")
captured = capsys.readouterr()
assert "已设置 Rust 镜像源: aliyun" in captured.out