import os import json import numpy as np import face_recognition import piexif from datetime import datetime from werkzeug.utils import secure_filename from PIL import Image from models import db, User, Photo # 目录路径 UPLOAD_FOLDER = 'static/uploads' USER_FOLDER = os.path.join(UPLOAD_FOLDER, 'users') PHOTO_FOLDER = os.path.join(UPLOAD_FOLDER, 'photos') TEMP_FOLDER = os.path.join(UPLOAD_FOLDER, 'temp') # 确保目录存在 for folder in [USER_FOLDER, PHOTO_FOLDER, TEMP_FOLDER]: os.makedirs(folder, exist_ok=True) def extract_face_encoding(photo_path): """提取人脸编码""" image = face_recognition.load_image_file(photo_path) encodings = face_recognition.face_encodings(image) return encodings[0] if encodings else None def save_with_exif(image_path, save_path): """保存图片并保留 EXIF 信息""" image = Image.open(image_path) exif_data = image.getexif() if exif_data: exif_bytes = piexif.dump(dict(exif_data)) image.save(save_path, exif=exif_bytes) else: image.save(save_path) return exif_data def format_exif_date(exif_data): """格式化 EXIF 时间为 YYYY-MM-DD""" try: return datetime.strptime(exif_data[306], "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%d") except (KeyError, ValueError, TypeError): return None # 处理无效时间 def add_user(username, userphoto): """添加用户并提取人脸编码""" if not username or not userphoto: return False user_dir = os.path.join(USER_FOLDER, username) os.makedirs(user_dir, exist_ok=True) photo_path = os.path.join(user_dir, "photo.jpg") userphoto.save(photo_path) encoding = extract_face_encoding(photo_path) if encoding is None: return False new_user = User(username=username, encoding=json.dumps(encoding.tolist()), photo=photo_path) db.session.add(new_user) db.session.commit() return True def edit_user(user_id, new_name, new_photo): """编辑用户信息""" user = User.query.get(user_id) if not user: return False user.username = new_name if new_photo: user_dir = os.path.join(USER_FOLDER, new_name) os.makedirs(user_dir, exist_ok=True) photo_path = os.path.join(user_dir, secure_filename(new_photo.filename)) new_photo.save(photo_path) encoding = extract_face_encoding(photo_path) if encoding: user.encoding = json.dumps(encoding.tolist()) user.photo = photo_path db.session.commit() return True import shutil def delete_user(user_id): """删除用户及其所有相关文件""" user = User.query.get(user_id) if not user: return False user_folder = os.path.join("static", "uploads", "users", user.username) user_folder = user_folder.replace("\\", "/") # 删除整个用户文件夹及其内容 if os.path.exists(user_folder): shutil.rmtree(user_folder) db.session.delete(user) db.session.commit() return True def match_face(photo_path, threshold=0.30): """匹配数据库中的人脸,一旦找到合适的直接返回""" known_users = User.query.all() if not known_users: return None unknown_encoding = extract_face_encoding(photo_path) if unknown_encoding is None: return None for user in known_users: if not user.encoding: continue try: stored_encoding = np.array(json.loads(user.encoding)) if stored_encoding.size == 0 or stored_encoding.ndim != 1: continue except (json.JSONDecodeError, TypeError): continue # 解析失败,跳过 if np.linalg.norm(stored_encoding - unknown_encoding) < threshold: return user # 直接返回匹配的用户 return None def classify_photos(photos): """分类照片""" for photo in photos: filename = secure_filename(photo.filename) temp_photo_path = os.path.join(TEMP_FOLDER, filename) temp_photo_path = temp_photo_path.replace("\\", "/") exif_data = save_with_exif(photo, temp_photo_path) date_str = format_exif_date(exif_data) if not date_str: os.remove(temp_photo_path) # 无效时间的照片直接删除 continue matched_user = match_face(temp_photo_path) if matched_user: user_folder = os.path.join(USER_FOLDER, matched_user.username, date_str) os.makedirs(user_folder, exist_ok=True) classified_photo_path = os.path.join(user_folder, filename) classified_photo_path =classified_photo_path.replace("\\", "/") os.rename(temp_photo_path, classified_photo_path) new_photo = Photo( user_id=matched_user.id, created_at=date_str, classification=True, classification_image_path=classified_photo_path ) db.session.add(new_photo) else: os.remove(temp_photo_path) # 没匹配到的照片删除 db.session.commit() def search_photos(username, date): """搜索用户的照片""" user = User.query.filter_by(username=username).first() if not user: return [] query = Photo.query.filter_by(user_id=user.id) if date: query = query.filter(Photo.created_at.like(f"{date}%")) return query.all()