match_face/app.py

118 lines
4.2 KiB
Python
Raw Normal View History

from flask import Flask, render_template, request, redirect, url_for
from flask_socketio import SocketIO, emit
import sqlite3
import os
import eventlet
import face_recognition
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = './static/db_image' # 设置文件上传路径
socketio = SocketIO(app, async_mode='eventlet')
# 从数据库中获取匹配日志记录
def get_match_logs(db_name="face_database.db"):
conn = sqlite3.connect(db_name)
c = conn.cursor()
c.execute("SELECT name, identity, image_path, match_time FROM match_logs")
logs = c.fetchall()
conn.close()
return logs
# 首页,展示匹配记录
@app.route('/')
def index():
logs = get_match_logs()
return render_template('index.html', logs=logs)
# 人员信息页面展示人员信息并进行CRUD操作
@app.route('/info_person', methods=['GET', 'POST'])
def info_person():
conn = sqlite3.connect('face_database.db')
c = conn.cursor()
if request.method == 'POST':
# 添加新人员
if 'add' in request.form:
name = request.form['name']
identity = request.form['identity']
image = request.files['image_path']
# 保存上传的图片并生成编码
if image:
image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename)
image.save(image_path)
# 使用face_recognition生成面部编码
loaded_image = face_recognition.load_image_file(image_path)
face_encodings = face_recognition.face_encodings(loaded_image)
if face_encodings:
encoding = ','.join(map(lambda x: format(x, 'b'), face_encodings[0]))
image_path = "db_image/"+image.filename # 仅保存文件名以便后续使用
else:
return "No face detected in the uploaded image."
else:
image_path = ""
encoding = ""
c.execute("INSERT INTO faces (name, identity, image_path, encoding) VALUES (?, ?, ?, ?)",
(name, identity, image_path, encoding))
# 更新人员信息
elif 'update' in request.form:
id = request.form['id']
name = request.form['name']
identity = request.form['identity']
image = request.files.get('image_path')
if image:
image_path = os.path.join(app.config['UPLOAD_FOLDER'], image.filename)
image.save(image_path)
# 使用face_recognition生成新的面部编码
loaded_image = face_recognition.load_image_file(image_path)
face_encodings = face_recognition.face_encodings(loaded_image)
if face_encodings:
encoding = ','.join(map(str, face_encodings[0])) # 将编码转换为字符串存储
image_path = image.filename # 仅保存文件名
else:
return "No face detected in the uploaded image."
else:
image_path = request.form['current_image_path'] # 使用当前的图片路径
encoding = request.form['encoding'] # 使用现有的编码
c.execute("UPDATE faces SET name=?, identity=?, image_path=?, encoding=? WHERE id=?",
(name, identity, image_path, encoding, id))
# 删除人员信息
elif 'delete' in request.form:
id = request.form['id']
c.execute("DELETE FROM faces WHERE id=?", (id,))
conn.commit()
# 获取所有人员记录
c.execute("SELECT * FROM faces")
persons = c.fetchall()
conn.close()
return render_template('info_person.html', persons=persons)
# 处理 WebSocket 连接
@socketio.on('connect')
def handle_connect():
print('Client connected')
emit('update', {'logs': get_match_logs()})
# 发送更新到客户端
def send_updates():
while True:
# 模拟实时数据更新
socketio.emit('update', {'logs': get_match_logs()}, broadcast=True)
eventlet.sleep(5) # 每 5 秒发送一次更新
if __name__ == '__main__':
socketio.start_background_task(send_updates)
socketio.run(app, debug=True)