245 lines
7.9 KiB
Python
245 lines
7.9 KiB
Python
|
||
import pyaudio
|
||
import wave
|
||
import requests
|
||
import json
|
||
import base64
|
||
import os
|
||
import edge_tts
|
||
import asyncio
|
||
import pygame
|
||
import openai
|
||
import uuid # 用于生成唯一的文件名
|
||
import sqlite3
|
||
|
||
def create_connection(db_file='conversation.db'):
|
||
# 连接到数据库(如果不存在,则会被创建)
|
||
conn = sqlite3.connect('conversation.db')
|
||
|
||
# 创建一个游标对象,用于执行SQL语句
|
||
c = conn.cursor()
|
||
|
||
# 创建一个名为conversation的表
|
||
c.execute('''CREATE TABLE IF NOT EXISTS conversation
|
||
(id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
question TEXT,
|
||
answer TEXT,
|
||
audio_path TEXT)''')
|
||
|
||
def insert_data(question, answer, audio_path):
|
||
conn = sqlite3.connect('conversation.db')
|
||
cursor = conn.cursor()
|
||
cursor.execute("INSERT INTO conversation (question, answer, audio_path) VALUES (?, ?, ?)",
|
||
(question, answer, audio_path))
|
||
conn.commit()
|
||
conn.close()
|
||
# 1.录音
|
||
# 用Pyaudio录制音频(生成wav文件)
|
||
def audio_record(rec_time, filename):
|
||
"""
|
||
:param rec_time : 音频录制时间
|
||
:param filename : 输出音频文件
|
||
:返回值:在当前目录输出一个音频文件
|
||
"""
|
||
|
||
CHUNK = 1024 # 定义数据流块
|
||
FORMAT = pyaudio.paInt16 # 16bit编码格式
|
||
CHANNELS = 1 # 单声道
|
||
RATE = 16000 # 16000采样频率
|
||
|
||
# 创建一个音频对象
|
||
p = pyaudio.PyAudio()
|
||
|
||
# 创建音频数据流
|
||
stream = p.open(format=FORMAT,
|
||
channels=CHANNELS,
|
||
rate=RATE,
|
||
input=True,
|
||
frames_per_buffer=CHUNK)
|
||
print('Start recording...')
|
||
frames = list() # 空列表用于保存录制的音频流
|
||
# 录制音频数据
|
||
for i in range(0, int(RATE / CHUNK * rec_time)):
|
||
data = stream.read(CHUNK)
|
||
frames.append(data)
|
||
# 录制完成
|
||
# print(frames)
|
||
# 停止数据流
|
||
stream.stop_stream()
|
||
stream.close()
|
||
# 关闭pyaudio
|
||
p.terminate()
|
||
print('recording done...')
|
||
|
||
# 保存音频文件
|
||
with wave.open(filename, 'wb') as f:
|
||
f.setnchannels(CHANNELS) # 设置音频声道数
|
||
f.setsampwidth(p.get_sample_size(FORMAT)) # 以字节为样本返回样本宽度
|
||
f.setframerate(RATE) # 设置采样频率
|
||
f.writeframes(b''.join(frames))
|
||
f.close()
|
||
|
||
|
||
# 2 获取token
|
||
|
||
API_KEY = "7myE5M0cY5gjyKbxcFQqWmZE" # 这里请替换为你的API_KEY
|
||
SECRET_KEY = "A2AtUqbqVLdo0kgfiwITWUlB0fxwCA3w" # 这里请替换为你的SECRET_KEY
|
||
|
||
|
||
def get_access_token():
|
||
"""
|
||
使用 AK,SK 生成鉴权签名(Access Token)
|
||
:return: access_token,或是None(如果错误)
|
||
"""
|
||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
|
||
return str(requests.post(url, params=params).json().get("access_token"))
|
||
|
||
|
||
# 3.上传录音文件
|
||
def BaiduYuYin(file_url, token):
|
||
"""
|
||
:param file_url: 录音文件路径
|
||
:param token: 获取的access token
|
||
:return: 录音识别出来的文本
|
||
"""
|
||
|
||
try:
|
||
RATE = '16000'
|
||
FORMAT = 'wav'
|
||
CUID = 'rvs7K414cquxm4f62jtasIRi6iNRNXR6'
|
||
DEV_PID = '1536' # 普通话,支持简单的英文识别
|
||
|
||
file_url = file_url
|
||
token = token
|
||
# 以字节格式读取文件之后进行编码
|
||
with open(file_url, 'rb') as f:
|
||
speech = base64.b64encode(f.read()).decode('utf-8')
|
||
size = os.path.getsize(file_url) # 语音文件的字节数
|
||
headers = {'Content-Type': 'application/json',
|
||
'Accept': 'application/json'} # json格式post上传本地文件
|
||
url = 'https://vop.baidu.com/server_api'
|
||
data = {
|
||
"format": FORMAT, # 格式
|
||
"rate": RATE, # 取样频率,固定值16000
|
||
"dev_pid": DEV_PID, # 语音识别类型
|
||
"speech": speech, # 本地语音文件的二进制数据,需要进行base64编码
|
||
"cuid": CUID, # 用户唯一标识,用来区分用户 建议填写能区分用户的机器MAC地址或IMEI码,长度为60字符以内。
|
||
"len": size, # 语音文件的字节数
|
||
"channel": 1, # 声道数,仅支持单声道,固定值为1
|
||
"token": token,
|
||
}
|
||
req = requests.request("POST", url, data=json.dumps(data),
|
||
headers=headers) # request.post 改为requests.request("POST"……)
|
||
data_dict = json.loads(req.text)
|
||
# print(data_dict['result'][0])
|
||
return data_dict['result'][0] # 返回文本
|
||
except:
|
||
return '识别不清楚'
|
||
|
||
|
||
from ollama import Client
|
||
def get_completion(prompt, model="solar"):
|
||
client = Client(host='http://8.130.118.164:11434/')
|
||
response = client.chat(model, messages=[
|
||
{
|
||
'role': 'user',
|
||
'content': prompt,
|
||
},
|
||
])
|
||
return response['message']['content']
|
||
|
||
|
||
# 5.文本转语音TTS:edge-tts
|
||
|
||
async def generate_audio_from_text(text, file_url):
|
||
"""
|
||
:param text:需要进行转换的文本
|
||
:file_url:转换后输出的音频文件地址
|
||
:return:无
|
||
"""
|
||
voice = 'zh-CN-YunxiNeural'
|
||
output = file_url
|
||
rate = '-4%'
|
||
volume = '+0%'
|
||
tts = edge_tts.Communicate(text=text, voice=voice, rate=rate, volume=volume)
|
||
await tts.save(output)
|
||
|
||
|
||
# 6.播放音频文件:pygame
|
||
def play_mp3(mp3_file):
|
||
"""
|
||
:param mp3_file:需要播放的录音文件地址
|
||
:return:无
|
||
"""
|
||
pygame.init() # 初始化pygame
|
||
pygame.mixer.init() # 初始化音频混合器
|
||
pygame.mixer.music.load(mp3_file) # 加载指定MP3文件
|
||
pygame.mixer.music.play() # 播放
|
||
clock = pygame.time.Clock()
|
||
while pygame.mixer.music.get_busy(): # 使用一个循环来等待音频播放完毕,保证程序不会在播放结束前退出
|
||
clock.tick(3)
|
||
|
||
import cv2
|
||
import socket
|
||
import threading
|
||
import queue
|
||
import time
|
||
|
||
def detect_face(face_queue):
|
||
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
||
video_capture = cv2.VideoCapture(0) # 打开摄像头 # 用于跟踪是否已经检测到人脸
|
||
face_detected = False
|
||
while True:
|
||
ret, frame = video_capture.read()
|
||
if not ret:
|
||
print("无法读取摄像头画面")
|
||
break
|
||
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5)
|
||
|
||
# 在图像中绘制检测到的人脸
|
||
for (x, y, w, h) in faces:
|
||
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
|
||
|
||
# 显示视频流
|
||
cv2.imshow('Video', frame)
|
||
|
||
if len(faces) > 0 and not face_detected: # 检测到人脸且之前未检测到
|
||
face_queue.put(True) # 将检测结果放入队列
|
||
face_detected = True # 设置为已检测到人脸
|
||
|
||
if len(faces) == 0: # 如果没有检测到人脸,重置状态
|
||
face_detected = False
|
||
|
||
if cv2.waitKey(1) & 0xFF == ord('q'): # 按 'q' 键退出
|
||
break
|
||
|
||
video_capture.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
# main.py
|
||
|
||
def start_main():
|
||
print("识别到人脸,开始录音...")
|
||
audio_record(5, 'user_audio.wav')
|
||
baidu_token = get_access_token()
|
||
baidu_result = BaiduYuYin('./user_audio.wav', baidu_token)
|
||
print("语音识别结果:", baidu_result)
|
||
model_response = get_completion(baidu_result)
|
||
print("模型回复:", model_response)
|
||
audio_filename = f"audio/{uuid.uuid4()}.mp3"
|
||
unique_audio_filename = f"static/{audio_filename}"
|
||
print("开始生成音频文件:", unique_audio_filename)
|
||
asyncio.run(generate_audio_from_text(model_response, unique_audio_filename))
|
||
insert_data(baidu_result, model_response, audio_filename)
|
||
play_mp3(unique_audio_filename)
|
||
|
||
if __name__ == "__main__":
|
||
start_main()
|
||
|
||
|
||
|
||
|