###########################################################################################
class QwenTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
self.api_key = getattr(opt, ’QWEN_API_KEY’, ’’) # 阿里云API Key
self.model = getattr(opt, ’QWEN_MODEL’, ’qwen3-tts-flash’) # 模型名称
self.voice = getattr(opt, ’QWEN_VOICE’, ’Cherry’) # 音色
self.language_type = getattr(opt, ’QWEN_LANGUAGE_TYPE’, ’Chinese’) # 语言类型
self.api_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
# 创建 tempwav 目录
self.temp_dir = "tempwav"
os.makedirs(self.temp_dir, exist_ok=True)
# 验证必要的配置参数
if not self.api_key:
logger.error("Qwen TTS 配置不完整,请设置 QWEN_API_KEY")
raise ValueError("Qwen TTS 配置不完整")
def txt_to_audio(self, msg):
text, textevent = msg
# 直接调用同步版本的 TTS
try:
self.sync_qwen_tts(text, msg)
except Exception as e:
logger.error(f"Qwen TTS 处理失败: {e}")
# 发送结束事件,即使失败也要通知前端
eventpoint = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
def sync_qwen_tts(self, text, msg):
"""同步版本的 Qwen TTS"""
start = time.perf_counter()
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"X-DashScope-SSE": "enable" # 启用服务器发送事件
}
payload = {
"model": self.model,
"input": {
"text": text,
"voice": self.voice,
"language_type": self.language_type
}
}
try:
logger.info(f"Qwen TTS 请求开始: {text[:50]}...")
response = requests.post(
self.api_url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
end = time.perf_counter()
logger.info(f"Qwen TTS 请求时间: {end-start:.2f}s")
if response.status_code != 200:
logger.error(f"Qwen TTS 请求失败: {response.status_code}, {response.text}")
return
# 收集所有音频数据
all_audio_data = bytearray()
audio_url = None
for line in response.iter_lines(decode_unicode=False):
if not line:
continue
try:
line_str = line.decode(’utf-8’).strip()
# 检查是否是 SSE 数据行
if line_str.startswith(’data:’):
data_str = line_str[5:].strip()
# 检查结束标记
if data_str == ’[DONE]’:
logger.info("Qwen TTS 流式请求完成")
break
# 解析 JSON 数据
data = json.loads(data_str)
# 检查是否有输出数据
if "output" in data and isinstance(data["output"], dict):
output = data["output"]
# 检查是否有音频 URL
if "audio" in output and isinstance(output["audio"], dict):
audio_info = output["audio"]
# 检查是否有音频 URL
if "url" in audio_info and audio_info["url"]:
audio_url = audio_info["url"]
logger.info(f"获取到音频 URL: {audio_url}")
# 检查是否有直接的数据(虽然看起来是空的)
elif "data" in audio_info and audio_info["data"]:
# 这里的数据看起来是空的,但以防万一还是处理
audio_data = audio_info["data"]
if audio_data and len(audio_data) > 10: # 确保不是空数据
try:
chunk_audio = base64.b64decode(audio_data)
all_audio_data.extend(chunk_audio)
logger.debug(f"解码后音频块大小: {len(chunk_audio)}")
except Exception as e:
logger.debug(f"Base64 解码失败,可能是空数据: {e}")
# 检查错误
elif "code" in data and data["code"] != 200:
logger.error(f"Qwen TTS API 错误: {data}")
break
except json.JSONDecodeError as e:
logger.warning(f"JSON 解析错误: {e}")
continue
except Exception as e:
logger.error(f"处理 Qwen TTS 流时出错: {e}")
continue
# 处理音频数据 - 优先使用 URL 下载
if audio_url:
logger.info(f"从 URL 下载音频: {audio_url}")
try:
# 下载音频文件
audio_response = requests.get(audio_url, timeout=30)
if audio_response.status_code == 200:
audio_content = audio_response.content
logger.info(f"从 URL 下载音频成功,大小: {len(audio_content)} 字节")
self.process_audio_data(audio_content, msg)
else:
logger.error(f"从 URL 下载音频失败: {audio_response.status_code}")
# 如果 URL 下载失败,尝试使用收集的 base64 数据
if all_audio_data:
logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
self.process_audio_data(bytes(all_audio_data), msg)
except Exception as e:
logger.error(f"从 URL 下载音频时出错: {e}")
# 如果 URL 下载失败,尝试使用收集的 base64 数据
if all_audio_data:
logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
self.process_audio_data(bytes(all_audio_data), msg)
elif all_audio_data:
logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
self.process_audio_data(bytes(all_audio_data), msg)
else:
logger.warning("Qwen TTS 未收到音频数据")
except requests.exceptions.RequestException as e:
logger.error(f"Qwen TTS 网络请求错误: {e}")
except Exception as e:
logger.exception(f"Qwen TTS 未知错误: {e}")
def process_audio_data(self, audio_data, msg):
"""处理完整的音频数据"""
text, textevent = msg
try:
# 生成临时文件名,包含会话ID和时间戳
timestamp = int(time.time())
session_id = getattr(self.opt, ’sessionid’, 0)
temp_file = os.path.join(self.temp_dir, f"qwen_audio_{session_id}_{timestamp}.wav")
with open(temp_file, ’wb’) as f:
f.write(audio_data)
logger.info(f"音频数据已保存到: {temp_file}")
# 使用 soundfile 读取音频
byte_stream = BytesIO(audio_data)
# 检查文件格式
try:
stream, sample_rate = sf.read(byte_stream)
logger.info(f’Qwen TTS 音频流 {sample_rate}: {stream.shape}’)
except Exception as e:
logger.error(f"无法读取音频文件: {e}")
# 可能是其他格式,尝试使用其他方法
try:
# 重置字节流
byte_stream.seek(0)
# 尝试使用 torchaudio
import torchaudio
stream, sample_rate = torchaudio.load(byte_stream)
stream = stream.numpy()[0] # 取第一个声道
logger.info(f’使用 torchaudio 读取音频: {sample_rate}: {stream.shape}’)
except Exception as e2:
logger.error(f"torchaudio 也无法读取: {e2}")
# 发送结束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
return
stream = stream.astype(np.float32)
if stream.ndim > 1:
logger.info(f’音频有 {stream.shape[1]} 个声道,只使用第一个’)
stream = stream[:, 0]
if sample_rate != self.sample_rate and stream.shape[0] > 0:
logger.info(f’音频采样率为 {sample_rate}, 重采样为 {self.sample_rate}’)
stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
# 发送开始事件
start_event = {’type’: ’tts’, ’status’: ’start’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), start_event)
# 分割并发送音频帧
streamlen = stream.shape[0]
idx = 0
while streamlen >= self.chunk and self.state == State.RUNNING:
self.parent.put_audio_frame(stream[idx:idx+self.chunk], None)
streamlen -= self.chunk
idx += self.chunk
# 处理剩余数据
if streamlen > 0 and self.state == State.RUNNING:
padding = np.zeros(self.chunk - streamlen, dtype=np.float32)
complete_chunk = np.concatenate((stream[idx:], padding))
self.parent.put_audio_frame(complete_chunk, None)
# 发送结束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
logger.info("Qwen TTS 音频处理完成")
except Exception as e:
logger.error(f"处理 Qwen TTS 音频数据时出错: {e}")
# 确保发送结束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
###########################################################################################