import cv2 import face_recognition import os import sqlite3 import numpy as np from datetime import datetime import time # 初始化摄像头 cap = cv2.VideoCapture(0) max_photos = 10 # 创建目录以保存照片 save_path = "./captured_faces" os.makedirs(save_path, exist_ok=True) def create_face_database(db_name="face_database.db"): """创建人脸数据库和匹配日志表""" conn = sqlite3.connect(db_name) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS faces (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, identity TEXT NOT NULL, image_path TEXT NOT NULL, encoding BLOB NOT NULL)''') c.execute('''CREATE TABLE IF NOT EXISTS match_logs (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, identity TEXT NOT NULL, image_path TEXT NOT NULL, match_time TEXT NOT NULL)''') conn.commit() conn.close() def add_face_to_database(name, identity, image_path, db_name="face_database.db"): """将人脸信息添加到数据库""" conn = sqlite3.connect(db_name) c = conn.cursor() # 将图片路径转换为相对路径 relative_image_path = os.path.relpath(image_path, start='./static') image = face_recognition.load_image_file(image_path) face_encodings = face_recognition.face_encodings(image) if face_encodings: face_encoding = face_encodings[0] encoding_blob = np.array(face_encoding).tobytes() c.execute("INSERT INTO faces (name, identity, image_path, encoding) VALUES (?, ?, ?, ?)", (name, identity, relative_image_path, encoding_blob)) conn.commit() conn.close() def match_faces(captured_images, db_name="face_database.db", tolerance=0.4, log_file="match_log.txt"): """比对抓拍的图片与数据库中的已知人脸""" conn = sqlite3.connect(db_name) c = conn.cursor() c.execute("SELECT name, identity, image_path, encoding FROM faces") known_faces = c.fetchall() for image_path in captured_images: unknown_image = face_recognition.load_image_file(image_path) face_encodings = face_recognition.face_encodings(unknown_image) if len(face_encodings) == 0: print(f"没有检测到人脸:{image_path}") continue unknown_encoding = face_encodings[0] for name, identity, db_image_path, encoding_blob in known_faces: known_encoding = np.frombuffer(encoding_blob, dtype=np.float64) match = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=tolerance) if match[0]: print(f"匹配成功: {name} ({identity}) 在 {image_path}") log_match(name, identity, db_image_path, db_name, log_file) # 使用数据库中的 image_path # 不更新数据库中的 image_path,因为我们只在匹配时使用它 conn.commit() conn.close() return True print(f"未发现匹配: 在 {image_path} 中的任何已知人脸") conn.close() return False def log_match(name, identity, db_image_path, db_name, log_file): """记录匹配结果,将数据库中的图片路径和匹配时间添加到匹配记录表单中""" conn = sqlite3.connect(db_name) c = conn.cursor() # 记录到日志文件 with open(log_file, 'a') as log: log.write(f"{name},{identity},{db_image_path}\n") # 将匹配信息插入到匹配日志表 c.execute("INSERT INTO match_logs (name, identity, image_path, match_time) VALUES (?, ?, ?, ?)", (name, identity, db_image_path, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) conn.commit() conn.close() # # 创建人脸数据库 # create_face_database() # # # 向数据库中添加人脸 #add_face_to_database("李四", "居民", "./static/db_image/test1.jpg") # add_face_to_database("张三", "居民", "./static/db_image/test2.jpg") # add_face_to_database("王五", "居民", "./static/db_image/test3.jpg") # 主程序循环 while True: ret, frame = cap.read() if not ret: break rgb_frame = frame[:, :, ::-1] face_locations = face_recognition.face_locations(rgb_frame) if face_locations: print("检测到人脸,开始抓拍...") captured_images = [] photo_count = 0 while photo_count < max_photos: ret, frame = cap.read() if not ret: break rgb_frame = frame[:, :, ::-1] face_locations = face_recognition.face_locations(rgb_frame) for face_location in face_locations: top, right, bottom, left = face_location cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2) face_image = frame[top:bottom, left:right] image_path = os.path.join(save_path, f"face_{photo_count + 1}.jpg") cv2.imwrite(image_path, face_image) captured_images.append(image_path) photo_count += 1 if photo_count >= max_photos: break cv2.imshow("Capturing Faces", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() if match_faces(captured_images): print("匹配成功") else: print("没有匹配") print("等待30秒后继续...") time.sleep(30) cap.release() cv2.destroyAllWindows()