189 lines
5.3 KiB
Python
189 lines
5.3 KiB
Python
import os
|
|
import json
|
|
import numpy as np
|
|
import face_recognition
|
|
import piexif
|
|
from datetime import datetime
|
|
from werkzeug.utils import secure_filename
|
|
from PIL import Image
|
|
from models import db, User, Photo
|
|
|
|
# 目录路径
|
|
UPLOAD_FOLDER = 'static/uploads'
|
|
USER_FOLDER = os.path.join(UPLOAD_FOLDER, 'users')
|
|
PHOTO_FOLDER = os.path.join(UPLOAD_FOLDER, 'photos')
|
|
TEMP_FOLDER = os.path.join(UPLOAD_FOLDER, 'temp')
|
|
|
|
# 确保目录存在
|
|
for folder in [USER_FOLDER, PHOTO_FOLDER, TEMP_FOLDER]:
|
|
os.makedirs(folder, exist_ok=True)
|
|
|
|
|
|
def extract_face_encoding(photo_path):
|
|
"""提取人脸编码"""
|
|
image = face_recognition.load_image_file(photo_path)
|
|
encodings = face_recognition.face_encodings(image)
|
|
return encodings[0] if encodings else None
|
|
|
|
|
|
def save_with_exif(image_path, save_path):
|
|
"""保存图片并保留 EXIF 信息"""
|
|
image = Image.open(image_path)
|
|
exif_data = image.getexif()
|
|
if exif_data:
|
|
exif_bytes = piexif.dump(dict(exif_data))
|
|
image.save(save_path, exif=exif_bytes)
|
|
else:
|
|
image.save(save_path)
|
|
return exif_data
|
|
|
|
|
|
def format_exif_date(exif_data):
|
|
"""格式化 EXIF 时间为 YYYY-MM-DD"""
|
|
try:
|
|
return datetime.strptime(exif_data[306], "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%d")
|
|
except (KeyError, ValueError, TypeError):
|
|
return None # 处理无效时间
|
|
|
|
|
|
def add_user(username, userphoto):
|
|
"""添加用户并提取人脸编码"""
|
|
if not username or not userphoto:
|
|
return False
|
|
|
|
user_dir = os.path.join(USER_FOLDER, username)
|
|
os.makedirs(user_dir, exist_ok=True)
|
|
|
|
photo_path = os.path.join(user_dir, "photo.jpg")
|
|
userphoto.save(photo_path)
|
|
|
|
encoding = extract_face_encoding(photo_path)
|
|
if encoding is None:
|
|
return False
|
|
|
|
new_user = User(username=username, encoding=json.dumps(encoding.tolist()), photo=photo_path)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return True
|
|
|
|
|
|
def edit_user(user_id, new_name, new_photo):
|
|
"""编辑用户信息"""
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return False
|
|
|
|
user.username = new_name
|
|
|
|
if new_photo:
|
|
user_dir = os.path.join(USER_FOLDER, new_name)
|
|
os.makedirs(user_dir, exist_ok=True)
|
|
|
|
photo_path = os.path.join(user_dir, secure_filename(new_photo.filename))
|
|
new_photo.save(photo_path)
|
|
|
|
encoding = extract_face_encoding(photo_path)
|
|
if encoding:
|
|
user.encoding = json.dumps(encoding.tolist())
|
|
user.photo = photo_path
|
|
|
|
db.session.commit()
|
|
return True
|
|
|
|
|
|
|
|
import shutil
|
|
|
|
def delete_user(user_id):
|
|
"""删除用户及其所有相关文件"""
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return False
|
|
|
|
user_folder = os.path.join("static", "uploads", "users", user.username)
|
|
user_folder = user_folder.replace("\\", "/")
|
|
|
|
# 删除整个用户文件夹及其内容
|
|
if os.path.exists(user_folder):
|
|
shutil.rmtree(user_folder)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return True
|
|
|
|
|
|
|
|
def match_face(photo_path, threshold=0.30):
|
|
"""匹配数据库中的人脸,一旦找到合适的直接返回"""
|
|
known_users = User.query.all()
|
|
if not known_users:
|
|
return None
|
|
|
|
unknown_encoding = extract_face_encoding(photo_path)
|
|
if unknown_encoding is None:
|
|
return None
|
|
|
|
for user in known_users:
|
|
if not user.encoding:
|
|
continue
|
|
|
|
try:
|
|
stored_encoding = np.array(json.loads(user.encoding))
|
|
if stored_encoding.size == 0 or stored_encoding.ndim != 1:
|
|
continue
|
|
except (json.JSONDecodeError, TypeError):
|
|
continue # 解析失败,跳过
|
|
|
|
if np.linalg.norm(stored_encoding - unknown_encoding) < threshold:
|
|
return user # 直接返回匹配的用户
|
|
|
|
return None
|
|
|
|
|
|
def classify_photos(photos):
|
|
"""分类照片"""
|
|
for photo in photos:
|
|
filename = secure_filename(photo.filename)
|
|
temp_photo_path = os.path.join(TEMP_FOLDER, filename)
|
|
temp_photo_path = temp_photo_path.replace("\\", "/")
|
|
|
|
exif_data = save_with_exif(photo, temp_photo_path)
|
|
date_str = format_exif_date(exif_data)
|
|
if not date_str:
|
|
os.remove(temp_photo_path) # 无效时间的照片直接删除
|
|
continue
|
|
|
|
matched_user = match_face(temp_photo_path)
|
|
if matched_user:
|
|
user_folder = os.path.join(USER_FOLDER, matched_user.username, date_str)
|
|
os.makedirs(user_folder, exist_ok=True)
|
|
|
|
classified_photo_path = os.path.join(user_folder, filename)
|
|
classified_photo_path =classified_photo_path.replace("\\", "/")
|
|
os.rename(temp_photo_path, classified_photo_path)
|
|
|
|
new_photo = Photo(
|
|
user_id=matched_user.id,
|
|
created_at=date_str,
|
|
classification=True,
|
|
classification_image_path=classified_photo_path
|
|
)
|
|
db.session.add(new_photo)
|
|
else:
|
|
os.remove(temp_photo_path) # 没匹配到的照片删除
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def search_photos(username, date):
|
|
"""搜索用户的照片"""
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
return []
|
|
|
|
query = Photo.query.filter_by(user_id=user.id)
|
|
if date:
|
|
query = query.filter(Photo.created_at.like(f"{date}%"))
|
|
|
|
return query.all()
|