match_face/app.py

157 lines
5.6 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_socketio import SocketIO, emit
import sqlite3
import os
import eventlet
import face_recognition
import numpy as np
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = './static/db_image' # 设置文件上传路径
app.secret_key = 'TG1pSXF40uyHMu0OVrVt4ZsaLLqGekElefvewbI1hzD9UZLV71efR60trjAtsRXg' # Set the secret key for session management
socketio = SocketIO(app, async_mode='eventlet')
def create_face_database(db_name="face_database.db"):
"""创建人脸数据库和匹配日志表"""
conn = sqlite3.connect(db_name)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS faces
(id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
identity TEXT NOT NULL,
image_path TEXT NOT NULL,
encoding BLOB NOT NULL)''')
c.execute('''CREATE TABLE IF NOT EXISTS match_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
identity TEXT NOT NULL,
image_path TEXT NOT NULL,
match_time TEXT NOT NULL)''')
conn.commit()
conn.close()
def add_face_to_database(name, identity, image_path, db_name="face_database.db"):
"""将人脸信息添加到数据库"""
conn = sqlite3.connect(db_name)
c = conn.cursor()
# 将图片路径转换为相对路径
relative_image_path = os.path.relpath(image_path, start='./static')
image = face_recognition.load_image_file(image_path)
face_encodings = face_recognition.face_encodings(image)
if face_encodings:
face_encoding = face_encodings[0]
encoding_blob = np.array(face_encoding).tobytes()
c.execute("INSERT INTO faces (name, identity, image_path, encoding) VALUES (?, ?, ?, ?)",
(name, identity, relative_image_path, encoding_blob))
conn.commit()
conn.close()
def get_match_logs(db_name="face_database.db"):
conn = sqlite3.connect(db_name)
c = conn.cursor()
c.execute("SELECT name, identity, image_path, match_time FROM match_logs")
logs = c.fetchall()
conn.close()
return logs
@app.route('/')
def index():
logs = get_match_logs()
return render_template('index.html', logs=logs)
@app.route('/info_person', methods=['GET', 'POST'])
def info_person():
conn = sqlite3.connect('face_database.db')
c = conn.cursor()
if request.method == 'POST':
if 'add' in request.form:
name = request.form['name']
identity = request.form['identity']
image = request.files['image_path']
if image:
image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename)
image.save(image_path)
try:
add_face_to_database(name, identity, image_path)
flash('Face added successfully!')
except Exception as e:
flash(f'Error adding face: {e}')
return redirect(url_for('info_person'))
elif 'update' in request.form:
id = request.form['id']
name = request.form['name']
identity = request.form['identity']
image = request.files.get('image_path')
if image:
# Generate new image path
new_image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename)
image.save(new_image_path)
# Load the new image and generate new face encoding
try:
loaded_image = face_recognition.load_image_file(new_image_path)
face_encodings = face_recognition.face_encodings(loaded_image)
if face_encodings:
encoding = np.array(face_encodings[0]).tobytes()
new_image_path = "db_image/" + image.filename # Relative path for DB
else:
flash("No face detected in the uploaded image.")
return redirect(url_for('info_person'))
except Exception as e:
flash(f'Error processing image: {e}')
return redirect(url_for('info_person'))
# Update record in the database
c.execute("UPDATE faces SET name=?, identity=?, image_path=?, encoding=? WHERE id=?",
(name, identity, new_image_path, encoding, id))
flash('Face updated successfully!')
else:
# If no new image is uploaded, keep existing image path and encoding
c.execute("UPDATE faces SET name=?, identity=? WHERE id=?",
(name, identity, id))
flash('Information updated successfully!')
elif 'delete' in request.form:
id = request.form['id']
c.execute("DELETE FROM faces WHERE id=?", (id,))
flash('Face deleted successfully!')
conn.commit()
# Fetch all records to display
c.execute("SELECT * FROM faces")
persons = c.fetchall()
conn.close()
return render_template('info_person.html', persons=persons)
@socketio.on('connect')
def handle_connect():
print('Client connected')
emit('update', {'logs': get_match_logs()})
def send_updates():
while True:
# 模拟实时数据更新
socketio.emit('update', {'logs': get_match_logs()}, broadcast=True)
eventlet.sleep(5) # 每 5 秒发送一次更新
if __name__ == '__main__':
create_face_database() # Ensure the database is created
socketio.start_background_task(send_updates)
socketio.run(app, debug=True)