AIchat/main.py

245 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pyaudio
import wave
import requests
import json
import base64
import os
import edge_tts
import asyncio
import pygame
import openai
import uuid # 用于生成唯一的文件名
import sqlite3
def create_connection(db_file='conversation.db'):
# 连接到数据库(如果不存在,则会被创建)
conn = sqlite3.connect('conversation.db')
# 创建一个游标对象用于执行SQL语句
c = conn.cursor()
# 创建一个名为conversation的表
c.execute('''CREATE TABLE IF NOT EXISTS conversation
(id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT,
answer TEXT,
audio_path TEXT)''')
def insert_data(question, answer, audio_path):
conn = sqlite3.connect('conversation.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO conversation (question, answer, audio_path) VALUES (?, ?, ?)",
(question, answer, audio_path))
conn.commit()
conn.close()
# 1.录音
# 用Pyaudio录制音频(生成wav文件)
def audio_record(rec_time, filename):
"""
:param rec_time : 音频录制时间
:param filename : 输出音频文件
:返回值:在当前目录输出一个音频文件
"""
CHUNK = 1024 # 定义数据流块
FORMAT = pyaudio.paInt16 # 16bit编码格式
CHANNELS = 1 # 单声道
RATE = 16000 # 16000采样频率
# 创建一个音频对象
p = pyaudio.PyAudio()
# 创建音频数据流
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print('Start recording...')
frames = list() # 空列表用于保存录制的音频流
# 录制音频数据
for i in range(0, int(RATE / CHUNK * rec_time)):
data = stream.read(CHUNK)
frames.append(data)
# 录制完成
# print(frames)
# 停止数据流
stream.stop_stream()
stream.close()
# 关闭pyaudio
p.terminate()
print('recording done...')
# 保存音频文件
with wave.open(filename, 'wb') as f:
f.setnchannels(CHANNELS) # 设置音频声道数
f.setsampwidth(p.get_sample_size(FORMAT)) # 以字节为样本返回样本宽度
f.setframerate(RATE) # 设置采样频率
f.writeframes(b''.join(frames))
f.close()
# 2 获取token
API_KEY = "7myE5M0cY5gjyKbxcFQqWmZE" # 这里请替换为你的API_KEY
SECRET_KEY = "A2AtUqbqVLdo0kgfiwITWUlB0fxwCA3w" # 这里请替换为你的SECRET_KEY
def get_access_token():
"""
使用 AKSK 生成鉴权签名Access Token
:return: access_token或是None(如果错误)
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
return str(requests.post(url, params=params).json().get("access_token"))
# 3.上传录音文件
def BaiduYuYin(file_url, token):
"""
:param file_url: 录音文件路径
:param token: 获取的access token
:return: 录音识别出来的文本
"""
try:
RATE = '16000'
FORMAT = 'wav'
CUID = 'rvs7K414cquxm4f62jtasIRi6iNRNXR6'
DEV_PID = '1536' # 普通话,支持简单的英文识别
file_url = file_url
token = token
# 以字节格式读取文件之后进行编码
with open(file_url, 'rb') as f:
speech = base64.b64encode(f.read()).decode('utf-8')
size = os.path.getsize(file_url) # 语音文件的字节数
headers = {'Content-Type': 'application/json',
'Accept': 'application/json'} # json格式post上传本地文件
url = 'https://vop.baidu.com/server_api'
data = {
"format": FORMAT, # 格式
"rate": RATE, # 取样频率,固定值16000
"dev_pid": DEV_PID, # 语音识别类型
"speech": speech, # 本地语音文件的二进制数据,需要进行base64编码
"cuid": CUID, # 用户唯一标识,用来区分用户 建议填写能区分用户的机器MAC地址或IMEI码,长度为60字符以内。
"len": size, # 语音文件的字节数
"channel": 1, # 声道数,仅支持单声道,固定值为1
"token": token,
}
req = requests.request("POST", url, data=json.dumps(data),
headers=headers) # request.post 改为requests.request("POST"……)
data_dict = json.loads(req.text)
# print(data_dict['result'][0])
return data_dict['result'][0] # 返回文本
except:
return '识别不清楚'
from ollama import Client
def get_completion(prompt, model="solar"):
client = Client(host='http://8.130.118.164:11434/')
response = client.chat(model, messages=[
{
'role': 'user',
'content': prompt,
},
])
return response['message']['content']
# 5.文本转语音TTSedge-tts
async def generate_audio_from_text(text, file_url):
"""
:param text:需要进行转换的文本
:file_url:转换后输出的音频文件地址
:return:无
"""
voice = 'zh-CN-YunxiNeural'
output = file_url
rate = '-4%'
volume = '+0%'
tts = edge_tts.Communicate(text=text, voice=voice, rate=rate, volume=volume)
await tts.save(output)
# 6.播放音频文件pygame
def play_mp3(mp3_file):
"""
:param mp3_file:需要播放的录音文件地址
:return:无
"""
pygame.init() # 初始化pygame
pygame.mixer.init() # 初始化音频混合器
pygame.mixer.music.load(mp3_file) # 加载指定MP3文件
pygame.mixer.music.play() # 播放
clock = pygame.time.Clock()
while pygame.mixer.music.get_busy(): # 使用一个循环来等待音频播放完毕,保证程序不会在播放结束前退出
clock.tick(3)
import cv2
import socket
import threading
import queue
import time
def detect_face(face_queue):
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
video_capture = cv2.VideoCapture(0) # 打开摄像头 # 用于跟踪是否已经检测到人脸
face_detected = False
while True:
ret, frame = video_capture.read()
if not ret:
print("无法读取摄像头画面")
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5)
# 在图像中绘制检测到的人脸
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
# 显示视频流
cv2.imshow('Video', frame)
if len(faces) > 0 and not face_detected: # 检测到人脸且之前未检测到
face_queue.put(True) # 将检测结果放入队列
face_detected = True # 设置为已检测到人脸
if len(faces) == 0: # 如果没有检测到人脸,重置状态
face_detected = False
if cv2.waitKey(1) & 0xFF == ord('q'): # 按 'q' 键退出
break
video_capture.release()
cv2.destroyAllWindows()
# main.py
def start_main():
print("识别到人脸,开始录音...")
audio_record(5, 'user_audio.wav')
baidu_token = get_access_token()
baidu_result = BaiduYuYin('./user_audio.wav', baidu_token)
print("语音识别结果:", baidu_result)
model_response = get_completion(baidu_result)
print("模型回复:", model_response)
audio_filename = f"audio/{uuid.uuid4()}.mp3"
unique_audio_filename = f"static/{audio_filename}"
print("开始生成音频文件:", unique_audio_filename)
asyncio.run(generate_audio_from_text(model_response, unique_audio_filename))
insert_data(baidu_result, model_response, audio_filename)
play_mp3(unique_audio_filename)
if __name__ == "__main__":
start_main()