from flask import Flask, render_template, request, redirect, url_for, flash from flask_socketio import SocketIO, emit import sqlite3 import os import eventlet import face_recognition import numpy as np app = Flask(__name__) app.config['UPLOAD_FOLDER'] = './static/db_image' # 设置文件上传路径 app.secret_key = 'TG1pSXF40uyHMu0OVrVt4ZsaLLqGekElefvewbI1hzD9UZLV71efR60trjAtsRXg' # Set the secret key for session management socketio = SocketIO(app, async_mode='eventlet') def create_face_database(db_name="face_database.db"): """创建人脸数据库和匹配日志表""" conn = sqlite3.connect(db_name) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS faces (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, identity TEXT NOT NULL, image_path TEXT NOT NULL, encoding BLOB NOT NULL)''') c.execute('''CREATE TABLE IF NOT EXISTS match_logs (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, identity TEXT NOT NULL, image_path TEXT NOT NULL, match_time TEXT NOT NULL)''') conn.commit() conn.close() def add_face_to_database(name, identity, image_path, db_name="face_database.db"): """将人脸信息添加到数据库""" conn = sqlite3.connect(db_name) c = conn.cursor() # 将图片路径转换为相对路径 relative_image_path = os.path.relpath(image_path, start='./static') image = face_recognition.load_image_file(image_path) face_encodings = face_recognition.face_encodings(image) if face_encodings: face_encoding = face_encodings[0] encoding_blob = np.array(face_encoding).tobytes() c.execute("INSERT INTO faces (name, identity, image_path, encoding) VALUES (?, ?, ?, ?)", (name, identity, relative_image_path, encoding_blob)) conn.commit() conn.close() def get_match_logs(db_name="face_database.db"): conn = sqlite3.connect(db_name) c = conn.cursor() c.execute("SELECT name, identity, image_path, match_time FROM match_logs") logs = c.fetchall() conn.close() return logs @app.route('/') def index(): logs = get_match_logs() return render_template('index.html', logs=logs) @app.route('/info_person', methods=['GET', 'POST']) def info_person(): conn = sqlite3.connect('face_database.db') c = conn.cursor() if request.method == 'POST': if 'add' in request.form: name = request.form['name'] identity = request.form['identity'] image = request.files['image_path'] if image: image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename) image.save(image_path) try: add_face_to_database(name, identity, image_path) flash('Face added successfully!') except Exception as e: flash(f'Error adding face: {e}') return redirect(url_for('info_person')) elif 'update' in request.form: id = request.form['id'] name = request.form['name'] identity = request.form['identity'] image = request.files.get('image_path') if image: # Generate new image path new_image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename) image.save(new_image_path) # Load the new image and generate new face encoding try: loaded_image = face_recognition.load_image_file(new_image_path) face_encodings = face_recognition.face_encodings(loaded_image) if face_encodings: encoding = np.array(face_encodings[0]).tobytes() new_image_path = "db_image/" + image.filename # Relative path for DB else: flash("No face detected in the uploaded image.") return redirect(url_for('info_person')) except Exception as e: flash(f'Error processing image: {e}') return redirect(url_for('info_person')) # Update record in the database c.execute("UPDATE faces SET name=?, identity=?, image_path=?, encoding=? WHERE id=?", (name, identity, new_image_path, encoding, id)) flash('Face updated successfully!') else: # If no new image is uploaded, keep existing image path and encoding c.execute("UPDATE faces SET name=?, identity=? WHERE id=?", (name, identity, id)) flash('Information updated successfully!') elif 'delete' in request.form: id = request.form['id'] c.execute("DELETE FROM faces WHERE id=?", (id,)) flash('Face deleted successfully!') conn.commit() # Fetch all records to display c.execute("SELECT * FROM faces") persons = c.fetchall() conn.close() return render_template('info_person.html', persons=persons) @socketio.on('connect') def handle_connect(): print('Client connected') emit('update', {'logs': get_match_logs()}) def send_updates(): while True: # 模拟实时数据更新 socketio.emit('update', {'logs': get_match_logs()}, broadcast=True) eventlet.sleep(5) # 每 5 秒发送一次更新 if __name__ == '__main__': create_face_database() # Ensure the database is created socketio.start_background_task(send_updates) socketio.run(app, debug=True)