This commit is contained in:
2026-06-25 12:12:04 +08:00
parent 4e3622ef02
commit 5c0f51e272
+70 -22
View File
@@ -8,6 +8,7 @@ from __future__ import annotations
import argparse
import email
import gzip
import hashlib
import json
import sqlite3
@@ -15,7 +16,7 @@ import threading
from datetime import datetime
from email.header import decode_header
from email.utils import parsedate_to_datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse
@@ -112,29 +113,35 @@ class EmailDatabase:
except sqlite3.Error:
return False
def search_emails(self, keyword: str = "", field: str = "all") -> list[dict[str, Any]]:
def search_emails(
self, keyword: str = "", field: str = "all", limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]:
"""搜索邮件."""
with self._lock:
cursor = self.conn.cursor()
# 只返回必要字段,减少数据量
select_fields = "id, subject, sender, date_parsed, has_attachments, file_size"
if not keyword:
cursor.execute(f"SELECT * FROM {TABLE_NAME} ORDER BY date_parsed DESC")
query = f"SELECT {select_fields} FROM {TABLE_NAME} ORDER BY date_parsed DESC LIMIT ? OFFSET ?"
cursor.execute(query, (limit, offset))
elif field == "subject":
query = f"SELECT * FROM {TABLE_NAME} WHERE subject LIKE ? ORDER BY date_parsed DESC"
cursor.execute(query, (f"%{keyword}%",))
query = f"SELECT {select_fields} FROM {TABLE_NAME} WHERE subject LIKE ? ORDER BY date_parsed DESC LIMIT ? OFFSET ?"
cursor.execute(query, (f"%{keyword}%", limit, offset))
elif field == "sender":
query = f"SELECT * FROM {TABLE_NAME} WHERE sender LIKE ? ORDER BY date_parsed DESC"
cursor.execute(query, (f"%{keyword}%",))
query = f"SELECT {select_fields} FROM {TABLE_NAME} WHERE sender LIKE ? ORDER BY date_parsed DESC LIMIT ? OFFSET ?"
cursor.execute(query, (f"%{keyword}%", limit, offset))
elif field == "recipients":
query = f"SELECT * FROM {TABLE_NAME} WHERE recipients LIKE ? ORDER BY date_parsed DESC"
cursor.execute(query, (f"%{keyword}%",))
query = f"SELECT {select_fields} FROM {TABLE_NAME} WHERE recipients LIKE ? ORDER BY date_parsed DESC LIMIT ? OFFSET ?"
cursor.execute(query, (f"%{keyword}%", limit, offset))
else: # all
query = f"""
SELECT * FROM {TABLE_NAME}
SELECT {select_fields} FROM {TABLE_NAME}
WHERE subject LIKE ? OR sender LIKE ? OR recipients LIKE ? OR body_text LIKE ?
ORDER BY date_parsed DESC
ORDER BY date_parsed DESC LIMIT ? OFFSET ?
"""
cursor.execute(query, (f"%{keyword}%", f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"))
cursor.execute(query, (f"%{keyword}%", f"%{keyword}%", f"%{keyword}%", f"%{keyword}%", limit, offset))
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
@@ -312,7 +319,7 @@ class EmlManagerHandler(BaseHTTPRequestHandler):
path = parsed_path.path
query_params = parse_qs(parsed_path.query)
if path == "/" or path == "/index.html":
if path in {"/", "/index.html"}:
self._serve_index()
elif path == "/test":
self._serve_test_page()
@@ -344,13 +351,28 @@ class EmlManagerHandler(BaseHTTPRequestHandler):
def _serve_index(self) -> None:
"""返回主页 HTML."""
html_content = self._get_html_template()
html_bytes = html_content.encode("utf-8")
# 检查客户端是否支持gzip压缩
accept_encoding = self.headers.get("Accept-Encoding", "")
if "gzip" in accept_encoding.lower():
# 使用gzip压缩
compressed = gzip.compress(html_bytes)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.send_header("Content-Encoding", "gzip")
self.send_header("Content-Length", str(len(compressed)))
self.send_header("Cache-Control", "public, max-age=3600")
self.end_headers()
self.wfile.write(html_content.encode("utf-8"))
self.wfile.write(compressed)
else:
# 不压缩
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(html_bytes)))
self.send_header("Cache-Control", "public, max-age=3600")
self.end_headers()
self.wfile.write(html_bytes)
def _serve_test_page(self) -> None:
"""返回测试页面 HTML."""
@@ -454,9 +476,18 @@ class EmlManagerHandler(BaseHTTPRequestHandler):
keyword = query_params.get("keyword", [""])[0]
field = query_params.get("field", ["all"])[0]
limit = int(query_params.get("limit", ["100"])[0]) # 默认返回100条
offset = int(query_params.get("offset", ["0"])[0]) # 默认偏移0
emails = self.db.search_emails(keyword, field)
self._send_json_response({"emails": emails, "count": len(emails)})
emails = self.db.search_emails(keyword, field, limit, offset)
total_count = self.db.get_email_count()
self._send_json_response({
"emails": emails,
"count": len(emails),
"total": total_count,
"limit": limit,
"offset": offset,
})
def _api_get_email(self, query_params: dict[str, list[str]]) -> None:
"""API: 获取单个邮件详情."""
@@ -561,11 +592,28 @@ class EmlManagerHandler(BaseHTTPRequestHandler):
def _send_json_response(self, data: dict[str, Any], status_code: int = 200) -> None:
"""发送 JSON 响应."""
json_bytes = json.dumps(data, ensure_ascii=False).encode("utf-8")
# 检查客户端是否支持gzip压缩
accept_encoding = self.headers.get("Accept-Encoding", "")
if "gzip" in accept_encoding.lower():
# 使用gzip压缩
compressed = gzip.compress(json_bytes)
self.send_response(status_code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Encoding", "gzip")
self.send_header("Content-Length", str(len(compressed)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
self.wfile.write(compressed)
else:
# 不压缩
self.send_response(status_code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(json_bytes)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json_bytes)
def _get_html_template(self) -> str:
"""获取 HTML 模板."""
@@ -1168,9 +1216,9 @@ def main() -> None:
else:
print("警告: 未指定工作目录,请在Web界面中手动导入")
# 启动服务器
# 启动多线程服务器
server_address = ("", args.port)
httpd = HTTPServer(server_address, EmlManagerHandler)
httpd = ThreadingHTTPServer(server_address, EmlManagerHandler)
print("EML 邮件管理器 Web 版已启动")
print(f"访问地址: http://localhost:{args.port}")