微立顶科技

新闻资讯

创新 服务 价值

  实时数字人新增QwenTTS流式语音合成

发布日期:2025/10/18 13:37:34      浏览量:

实时数字人新增QwenTTS流式语音合成方式


1. ttsreal.py 新增

###########################################################################################
class QwenTTS(BaseTTS):
    def __init__(self, opt, parent):
        super().__init__(opt, parent)
        self.api_key = getattr(opt, ’QWEN_API_KEY’, ’’)  # 阿里云API Key
        self.model = getattr(opt, ’QWEN_MODEL’, ’qwen3-tts-flash’)  # 模型名称
        self.voice = getattr(opt, ’QWEN_VOICE’, ’Cherry’)  # 音色
        self.language_type = getattr(opt, ’QWEN_LANGUAGE_TYPE’, ’Chinese’)  # 语言类型
        self.api_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"

        # 创建 tempwav 目录
        self.temp_dir = "tempwav"
        os.makedirs(self.temp_dir, exist_ok=True)

       
        # 验证必要的配置参数
        if not self.api_key:
            logger.error("Qwen TTS 配置不完整,请设置 QWEN_API_KEY")
            raise ValueError("Qwen TTS 配置不完整")

    def txt_to_audio(self, msg):
        text, textevent = msg
       
        # 直接调用同步版本的 TTS
        try:
            self.sync_qwen_tts(text, msg)
        except Exception as e:
            logger.error(f"Qwen TTS 处理失败: {e}")
            # 发送结束事件,即使失败也要通知前端
            eventpoint = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
            self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)

    def sync_qwen_tts(self, text, msg):
        """同步版本的 Qwen TTS"""
        start = time.perf_counter()
       
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "X-DashScope-SSE": "enable"  # 启用服务器发送事件
        }

        payload = {
            "model": self.model,
            "input": {
                "text": text,
                "voice": self.voice,
                "language_type": self.language_type
            }
        }

        try:
            logger.info(f"Qwen TTS 请求开始: {text[:50]}...")
            response = requests.post(
                self.api_url,
                headers=headers,
                json=payload,
                stream=True,
                timeout=30
            )
           
            end = time.perf_counter()
            logger.info(f"Qwen TTS 请求时间: {end-start:.2f}s")

            if response.status_code != 200:
                logger.error(f"Qwen TTS 请求失败: {response.status_code}, {response.text}")
                return

            # 收集所有音频数据
            all_audio_data = bytearray()
            audio_url = None
           
            for line in response.iter_lines(decode_unicode=False):
                if not line:
                    continue
                   
                try:
                    line_str = line.decode(’utf-8’).strip()
                   
                    # 检查是否是 SSE 数据行
                    if line_str.startswith(’data:’):
                        data_str = line_str[5:].strip()
                       
                        # 检查结束标记
                        if data_str == ’[DONE]’:
                            logger.info("Qwen TTS 流式请求完成")
                            break
                       
                        # 解析 JSON 数据
                        data = json.loads(data_str)
                       
                        # 检查是否有输出数据
                        if "output" in data and isinstance(data["output"], dict):
                            output = data["output"]
                           
                            # 检查是否有音频 URL
                            if "audio" in output and isinstance(output["audio"], dict):
                                audio_info = output["audio"]
                               
                                # 检查是否有音频 URL
                                if "url" in audio_info and audio_info["url"]:
                                    audio_url = audio_info["url"]
                                    logger.info(f"获取到音频 URL: {audio_url}")
                               
                                # 检查是否有直接的数据(虽然看起来是空的)
                                elif "data" in audio_info and audio_info["data"]:
                                    # 这里的数据看起来是空的,但以防万一还是处理
                                    audio_data = audio_info["data"]
                                    if audio_data and len(audio_data) > 10:  # 确保不是空数据
                                        try:
                                            chunk_audio = base64.b64decode(audio_data)
                                            all_audio_data.extend(chunk_audio)
                                            logger.debug(f"解码后音频块大小: {len(chunk_audio)}")
                                        except Exception as e:
                                            logger.debug(f"Base64 解码失败,可能是空数据: {e}")
                           
                        # 检查错误
                        elif "code" in data and data["code"] != 200:
                            logger.error(f"Qwen TTS API 错误: {data}")
                            break
                           
                except json.JSONDecodeError as e:
                    logger.warning(f"JSON 解析错误: {e}")
                    continue
                except Exception as e:
                    logger.error(f"处理 Qwen TTS 流时出错: {e}")
                    continue
           
            # 处理音频数据 - 优先使用 URL 下载
            if audio_url:
                logger.info(f"从 URL 下载音频: {audio_url}")
                try:
                    # 下载音频文件
                    audio_response = requests.get(audio_url, timeout=30)
                    if audio_response.status_code == 200:
                        audio_content = audio_response.content
                        logger.info(f"从 URL 下载音频成功,大小: {len(audio_content)} 字节")
                        self.process_audio_data(audio_content, msg)
                    else:
                        logger.error(f"从 URL 下载音频失败: {audio_response.status_code}")
                        # 如果 URL 下载失败,尝试使用收集的 base64 数据
                        if all_audio_data:
                            logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
                            self.process_audio_data(bytes(all_audio_data), msg)
                except Exception as e:
                    logger.error(f"从 URL 下载音频时出错: {e}")
                    # 如果 URL 下载失败,尝试使用收集的 base64 数据
                    if all_audio_data:
                        logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
                        self.process_audio_data(bytes(all_audio_data), msg)
            elif all_audio_data:
                logger.info(f"使用收集的音频数据,大小: {len(all_audio_data)} 字节")
                self.process_audio_data(bytes(all_audio_data), msg)
            else:
                logger.warning("Qwen TTS 未收到音频数据")
               
        except requests.exceptions.RequestException as e:
            logger.error(f"Qwen TTS 网络请求错误: {e}")
        except Exception as e:
            logger.exception(f"Qwen TTS 未知错误: {e}")

    def process_audio_data(self, audio_data, msg):
        """处理完整的音频数据"""
        text, textevent = msg
       
        try:
            # 生成临时文件名,包含会话ID和时间戳
            timestamp = int(time.time())
            session_id = getattr(self.opt, ’sessionid’, 0)
            temp_file = os.path.join(self.temp_dir, f"qwen_audio_{session_id}_{timestamp}.wav")

            with open(temp_file, ’wb’) as f:
                f.write(audio_data)
            logger.info(f"音频数据已保存到: {temp_file}")
           
            # 使用 soundfile 读取音频
            byte_stream = BytesIO(audio_data)
           
            # 检查文件格式
            try:
                stream, sample_rate = sf.read(byte_stream)
                logger.info(f’Qwen TTS 音频流 {sample_rate}: {stream.shape})
            except Exception as e:
                logger.error(f"无法读取音频文件: {e}")
                # 可能是其他格式,尝试使用其他方法
                try:
                    # 重置字节流
                    byte_stream.seek(0)
                    # 尝试使用 torchaudio
                    import torchaudio
                    stream, sample_rate = torchaudio.load(byte_stream)
                    stream = stream.numpy()[0]  # 取第一个声道
                    logger.info(f’使用 torchaudio 读取音频: {sample_rate}: {stream.shape})
                except Exception as e2:
                    logger.error(f"torchaudio 也无法读取: {e2}")
                    # 发送结束事件
                    end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
                    self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
                    return
           
            stream = stream.astype(np.float32)

            if stream.ndim > 1:
                logger.info(f’音频有 {stream.shape[1]} 个声道,只使用第一个’)
                stream = stream[:, 0]
       
            if sample_rate != self.sample_rate and stream.shape[0] > 0:
                logger.info(f’音频采样率为 {sample_rate}, 重采样为 {self.sample_rate})
                stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)

            # 发送开始事件
            start_event = {’type’: ’tts’, ’status’: ’start’, ’text’: text, ’msgevent’: textevent}
            self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), start_event)
           
            # 分割并发送音频帧
            streamlen = stream.shape[0]
            idx = 0
           
            while streamlen >= self.chunk and self.state == State.RUNNING:
                self.parent.put_audio_frame(stream[idx:idx+self.chunk], None)
                streamlen -= self.chunk
                idx += self.chunk
           
            # 处理剩余数据
            if streamlen > 0 and self.state == State.RUNNING:
                padding = np.zeros(self.chunk - streamlen, dtype=np.float32)
                complete_chunk = np.concatenate((stream[idx:], padding))
                self.parent.put_audio_frame(complete_chunk, None)
           
            # 发送结束事件
            end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
            self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
           
            logger.info("Qwen TTS 音频处理完成")
           
        except Exception as e:
            logger.error(f"处理 Qwen TTS 音频数据时出错: {e}")
            # 确保发送结束事件
            end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
            self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)

###########################################################################################


2. basereal.py 新增

        if opt.tts == "qwen":
            from ttsreal import QwenTTS
            self.tts = QwenTTS(opt, self)
        elif opt.tts == "volcengine":
            from ttsreal import VolcengineTTS
            self.tts = VolcengineTTS(opt, self)
        elif opt.tts == "local":
            from ttsreal import LocalTTS
            self.tts = LocalTTS(opt, self)
        elif opt.tts == "edgetts":
            self.tts = EdgeTTS(opt, self)
        elif opt.tts == "gpt-sovits":
            self.tts = SovitsTTS(opt,self)
        elif opt.tts == "xtts":
            self.tts = XTTS(opt,self)
        elif opt.tts == "cosyvoice":
            self.tts = CosyVoiceTTS(opt,self)
        elif opt.tts == "fishtts":
            self.tts = FishTTS(opt,self)
        elif opt.tts == "tencent":
            self.tts = TencentTTS(opt,self)


3. appweb.py 新增

    # 在 parser.add_argument 部分添加以下参数
    parser.add_argument(’--QWEN_API_KEY’, type=str, default=’’, help="阿里云 Qwen TTS API Key")
    parser.add_argument(’--QWEN_MODEL’, type=str, default=’qwen3-tts-flash’, help="Qwen TTS 模型名称")
    parser.add_argument(’--QWEN_VOICE’, type=str, default=’Cherry’, help="Qwen TTS 音色")
    parser.add_argument(’--QWEN_LANGUAGE_TYPE’, type=str, default=’Chinese’, help="Qwen TTS 语言类型")  


启动命令调整:
conda activate nerfstream
python appweb.py --transport webrtc --model wav2lip --avatar_id wav2lip_avatar7 --listenport 443  --batch_size 8 --tts qwen --QWEN_API_KEY "sk-f***********************************"



  业务实施流程

需求调研 →

团队组建和动员 →

数据初始化 →

调试完善 →

解决方案和选型 →

硬件网络部署 →

系统部署试运行 →

系统正式上线 →

合作协议

系统开发/整合

制作文档和员工培训

售后服务

马上咨询: 如果您有业务方面的问题或者需求,欢迎您咨询!我们带来的不仅仅是技术,还有行业经验积累。
QQ: 39764417/308460098     Phone: 13 9800 1 9844 / 135 6887 9550     联系人:石先生/雷先生