photo/functions.py

189 lines
5.3 KiB
Python

import os
import json
import numpy as np
import face_recognition
import piexif
from datetime import datetime
from werkzeug.utils import secure_filename
from PIL import Image
from models import db, User, Photo
# 目录路径
UPLOAD_FOLDER = 'static/uploads'
USER_FOLDER = os.path.join(UPLOAD_FOLDER, 'users')
PHOTO_FOLDER = os.path.join(UPLOAD_FOLDER, 'photos')
TEMP_FOLDER = os.path.join(UPLOAD_FOLDER, 'temp')
# 确保目录存在
for folder in [USER_FOLDER, PHOTO_FOLDER, TEMP_FOLDER]:
os.makedirs(folder, exist_ok=True)
def extract_face_encoding(photo_path):
"""提取人脸编码"""
image = face_recognition.load_image_file(photo_path)
encodings = face_recognition.face_encodings(image)
return encodings[0] if encodings else None
def save_with_exif(image_path, save_path):
"""保存图片并保留 EXIF 信息"""
image = Image.open(image_path)
exif_data = image.getexif()
if exif_data:
exif_bytes = piexif.dump(dict(exif_data))
image.save(save_path, exif=exif_bytes)
else:
image.save(save_path)
return exif_data
def format_exif_date(exif_data):
"""格式化 EXIF 时间为 YYYY-MM-DD"""
try:
return datetime.strptime(exif_data[306], "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%d")
except (KeyError, ValueError, TypeError):
return None # 处理无效时间
def add_user(username, userphoto):
"""添加用户并提取人脸编码"""
if not username or not userphoto:
return False
user_dir = os.path.join(USER_FOLDER, username)
os.makedirs(user_dir, exist_ok=True)
photo_path = os.path.join(user_dir, "photo.jpg")
userphoto.save(photo_path)
encoding = extract_face_encoding(photo_path)
if encoding is None:
return False
new_user = User(username=username, encoding=json.dumps(encoding.tolist()), photo=photo_path)
db.session.add(new_user)
db.session.commit()
return True
def edit_user(user_id, new_name, new_photo):
"""编辑用户信息"""
user = User.query.get(user_id)
if not user:
return False
user.username = new_name
if new_photo:
user_dir = os.path.join(USER_FOLDER, new_name)
os.makedirs(user_dir, exist_ok=True)
photo_path = os.path.join(user_dir, secure_filename(new_photo.filename))
new_photo.save(photo_path)
encoding = extract_face_encoding(photo_path)
if encoding:
user.encoding = json.dumps(encoding.tolist())
user.photo = photo_path
db.session.commit()
return True
import shutil
def delete_user(user_id):
"""删除用户及其所有相关文件"""
user = User.query.get(user_id)
if not user:
return False
user_folder = os.path.join("static", "uploads", "users", user.username)
user_folder = user_folder.replace("\\", "/")
# 删除整个用户文件夹及其内容
if os.path.exists(user_folder):
shutil.rmtree(user_folder)
db.session.delete(user)
db.session.commit()
return True
def match_face(photo_path, threshold=0.30):
"""匹配数据库中的人脸,一旦找到合适的直接返回"""
known_users = User.query.all()
if not known_users:
return None
unknown_encoding = extract_face_encoding(photo_path)
if unknown_encoding is None:
return None
for user in known_users:
if not user.encoding:
continue
try:
stored_encoding = np.array(json.loads(user.encoding))
if stored_encoding.size == 0 or stored_encoding.ndim != 1:
continue
except (json.JSONDecodeError, TypeError):
continue # 解析失败,跳过
if np.linalg.norm(stored_encoding - unknown_encoding) < threshold:
return user # 直接返回匹配的用户
return None
def classify_photos(photos):
"""分类照片"""
for photo in photos:
filename = secure_filename(photo.filename)
temp_photo_path = os.path.join(TEMP_FOLDER, filename)
temp_photo_path = temp_photo_path.replace("\\", "/")
exif_data = save_with_exif(photo, temp_photo_path)
date_str = format_exif_date(exif_data)
if not date_str:
os.remove(temp_photo_path) # 无效时间的照片直接删除
continue
matched_user = match_face(temp_photo_path)
if matched_user:
user_folder = os.path.join(USER_FOLDER, matched_user.username, date_str)
os.makedirs(user_folder, exist_ok=True)
classified_photo_path = os.path.join(user_folder, filename)
classified_photo_path =classified_photo_path.replace("\\", "/")
os.rename(temp_photo_path, classified_photo_path)
new_photo = Photo(
user_id=matched_user.id,
created_at=date_str,
classification=True,
classification_image_path=classified_photo_path
)
db.session.add(new_photo)
else:
os.remove(temp_photo_path) # 没匹配到的照片删除
db.session.commit()
def search_photos(username, date):
"""搜索用户的照片"""
user = User.query.filter_by(username=username).first()
if not user:
return []
query = Photo.query.filter_by(user_id=user.id)
if date:
query = query.filter(Photo.created_at.like(f"{date}%"))
return query.all()