Commit 1d461bc7 authored by 胡志宏's avatar 胡志宏 🤗

Initial commit

parents
.venv/
__pycache__/
*.pyc
app.log
.idea/
.vscode/
"""
MuseTalk Digital Human Test Client
Orchestrates: Qwen LLM + Tencent TTS + Tencent ASR + MuseTalk Streaming Service
Usage:
pip install -r requirements.txt
python app.py
Then open http://localhost:8008 in browser
"""
import os
import json
import asyncio
import threading
import logging
import time
import requests
import uvicorn
import websockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from openai import OpenAI
from tencent_speech import TencentStreamingTTS, TencentRealtimeASR
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "app.log"), encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
# ================================================================== #
# Configuration
# ================================================================== #
MUSETALK_URL = "http://10.10.0.102:8001"
MUSETALK_WS_URL = "ws://10.10.0.102:8001"
LLM_API_KEY = "..."
LLM_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
LLM_MODEL = "qwen-plus"
# Tencent Cloud TTS & ASR
TENCENT_APP_ID = "1312256669"
TENCENT_SECRET_ID = "AKIDMcu68yc4QdlhPAxXxCJJDyWs1weATZS8"
TENCENT_SECRET_KEY = "XryseDcppiRCsU2VovxwoV2MWWN1VCk9"
TENCENT_TTS_VOICE_TYPE = "502004"
TENCENT_TTS_SPEED = 0.8
TENCENT_TTS_VOLUME = 5
AVATARS = {
"psychologist": {
"name": "心理咨询师",
"video": r"d:\work\fusion\Avatar\MuseTalk\psychologist25f.mp4",
},
"sun": {
"name": "孙老师",
"video": r"d:\work\fusion\Avatar\MuseTalk\sun.mp4",
},
"jingjing": {
"name": "静静",
"video": r"d:\work\fusion\Avatar\MuseTalk\zhayan25f.mp4",
},
}
DEFAULT_AVATAR_ID = "jingjing"
SYSTEM_PROMPT = (
"你是一名医院前台接待员,负责回答患者的问题、指引就诊流程、介绍医院科室等。"
"请用简洁、友好、专业的语气回答,每次回答控制在50字以内。"
"回答中不要使用表情符号和特殊符号。"
)
# ================================================================== #
# LLM Client (Qwen) - mutable runtime config
# ================================================================== #
llm_config = {
"api_key": LLM_API_KEY,
"base_url": LLM_BASE_URL,
"model": LLM_MODEL,
"system_prompt": SYSTEM_PROMPT,
}
llm_client = OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL)
SENTENCE_DELIMITERS = set("。!?;\n")
import re
_EMOJI_RE = re.compile(
"[\U00010000-\U0010ffff" # supplementary planes (emoji)
"\U0000200d" # zero width joiner
"\U0000fe0f" # variation selector
"\U00002600-\U000027bf" # misc symbols
"\U0000fe00-\U0000fe0f" # variation selectors
"~~]", # fullwidth tilde etc.
flags=re.UNICODE,
)
def clean_text_for_tts(text: str) -> str:
"""Remove emoji and special chars that TTS can't handle."""
return _EMOJI_RE.sub("", text).strip()
# ================================================================== #
# LLM + TTS Streaming Pipeline
# ================================================================== #
def stream_llm_and_tts(
user_text: str,
history: list[dict],
loop: asyncio.AbstractEventLoop,
audio_queue: asyncio.Queue,
text_queue: asyncio.Queue,
t_start: float,
):
"""Stream LLM → feed sentences to Tencent TTS → push PCM to audio_queue.
Runs in a background thread. Returns full reply text.
"""
messages = [{"role": "system", "content": llm_config["system_prompt"]}]
messages.extend(history)
messages.append({"role": "user", "content": user_text})
# Create Tencent TTS
tts = TencentStreamingTTS(
app_id=TENCENT_APP_ID,
secret_id=TENCENT_SECRET_ID,
secret_key=TENCENT_SECRET_KEY,
voice_type=TENCENT_TTS_VOICE_TYPE,
speed=TENCENT_TTS_SPEED,
volume=TENCENT_TTS_VOLUME,
sample_rate=16000,
)
first_audio = [True]
def on_audio(pcm_bytes):
if first_audio[0]:
logger.info("[TIMING] TTS first audio chunk: %.0fms", (time.time() - t_start) * 1000)
first_audio[0] = False
loop.call_soon_threadsafe(audio_queue.put_nowait, pcm_bytes)
# Connect TTS in parallel with LLM call
tts_connected = [False]
tts_error = [None]
def _connect_tts():
try:
tts.connect(on_audio)
tts_connected[0] = True
logger.info("[TIMING] TTS connected: %.0fms", (time.time() - t_start) * 1000)
except Exception as e:
tts_error[0] = e
logger.error("TTS connect failed: %s", e)
tts_thread = threading.Thread(target=_connect_tts)
tts_thread.start()
# Stream LLM response
full_reply = ""
sentence_buffer = ""
first_token = True
first_tts_feed = True
try:
stream = llm_client.chat.completions.create(
model=llm_config["model"],
messages=messages,
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
if not content:
continue
full_reply += content
sentence_buffer += content
if first_token:
logger.info("[TIMING] LLM first token: %.0fms | %r", (time.time() - t_start) * 1000, content)
first_token = False
# Push text chunk to frontend for streaming display
loop.call_soon_threadsafe(text_queue.put_nowait, content)
# Split at sentence delimiters: feed complete sentences, keep remainder
while any(d in sentence_buffer for d in SENTENCE_DELIMITERS):
# Find the first delimiter position
split_pos = len(sentence_buffer)
for d in SENTENCE_DELIMITERS:
idx = sentence_buffer.find(d)
if idx != -1 and idx + 1 < split_pos:
split_pos = idx + 1
sentence = sentence_buffer[:split_pos]
sentence_buffer = sentence_buffer[split_pos:]
if sentence.strip():
if first_tts_feed:
logger.info("[TIMING] First sentence ready: %.0fms", (time.time() - t_start) * 1000)
first_tts_feed = False
tts_thread.join()
if tts_error[0]:
break
clean = clean_text_for_tts(sentence)
if clean:
logger.info("Feeding to TTS: %s", clean)
tts.send_text(clean)
except Exception:
logger.exception("LLM streaming error")
# Ensure TTS connected
if not tts_connected[0]:
tts_thread.join()
if sentence_buffer.strip() and tts_connected[0]:
if first_tts_feed:
logger.info("[TIMING] First sentence ready: %.0fms", (time.time() - t_start) * 1000)
clean = clean_text_for_tts(sentence_buffer)
if clean:
logger.info("Feeding to TTS (final): %s", clean)
tts.send_text(clean)
logger.info("[TIMING] LLM complete: %.0fms", (time.time() - t_start) * 1000)
if tts_connected[0]:
tts.complete()
loop.call_soon_threadsafe(audio_queue.put_nowait, None)
loop.call_soon_threadsafe(text_queue.put_nowait, None) # signal end
logger.info("[TIMING] TTS complete: %.0fms", (time.time() - t_start) * 1000)
return full_reply
# ================================================================== #
# ASR Helper
# ================================================================== #
def run_asr_session(
loop: asyncio.AbstractEventLoop,
audio_input_queue: asyncio.Queue,
text_output_queue: asyncio.Queue,
):
"""Run Tencent ASR in a background thread.
Reads PCM chunks from audio_input_queue, sends to ASR, pushes transcription to text_output_queue.
Send None to audio_input_queue to stop.
"""
asr = TencentRealtimeASR(
app_id=TENCENT_APP_ID,
secret_id=TENCENT_SECRET_ID,
secret_key=TENCENT_SECRET_KEY,
engine_type="16k_zh_large",
vad_silence_time=800,
)
try:
asr.connect()
logger.info("ASR connected")
stop_flag = threading.Event()
def _recv_results():
"""Receive ASR results. When VAD detects silence (slice_type==2), signal done."""
while not stop_flag.is_set():
result = asr.recv_result()
if not result or result.get("code") != 0:
continue
res = result.get("result", {})
voice_text = res.get("voice_text_str", "")
slice_type = res.get("slice_type", 0)
if voice_text:
is_final = slice_type == 2
loop.call_soon_threadsafe(
text_output_queue.put_nowait,
{"text": voice_text, "is_final": is_final},
)
if is_final:
logger.info("ASR VAD end-of-speech: %s", voice_text)
loop.call_soon_threadsafe(
text_output_queue.put_nowait,
{"text": voice_text, "is_final": True, "done": True},
)
return
recv_thread = threading.Thread(target=_recv_results, daemon=True)
recv_thread.start()
# Send audio chunks, stop when recv detects end-of-speech or manual stop
while not stop_flag.is_set():
try:
chunk = audio_input_queue.get_nowait()
except Exception:
time.sleep(0.02)
if not recv_thread.is_alive():
break # VAD triggered, stop sending
continue
if chunk is None:
break
asr.send_audio(chunk)
if not recv_thread.is_alive():
break
if recv_thread.is_alive():
# Manual stop
stop_flag.set()
results = asr.close()
recv_thread.join(timeout=3)
final_text = ""
for r in results:
if r.get("code") == 0:
vt = r.get("result", {}).get("voice_text_str", "")
if vt:
final_text = vt
if final_text:
loop.call_soon_threadsafe(
text_output_queue.put_nowait,
{"text": final_text, "is_final": True, "done": True},
)
logger.info("ASR manual stop: %s", final_text)
else:
try:
asr.close()
except Exception:
pass
logger.info("ASR ended (VAD auto-detect)")
except Exception:
logger.exception("ASR error")
loop.call_soon_threadsafe(
text_output_queue.put_nowait,
{"text": "", "is_final": True, "done": True, "error": True},
)
# ================================================================== #
# MuseTalk Helpers
# ================================================================== #
def upload_avatar(video_path: str, avatar_id: str) -> dict:
with open(video_path, "rb") as f:
resp = requests.post(
f"{MUSETALK_URL}/avatars",
files={"video": (os.path.basename(video_path), f)},
data={"avatar_id": avatar_id},
timeout=60,
)
return resp.json()
def get_avatar_status(avatar_id: str) -> dict:
resp = requests.get(f"{MUSETALK_URL}/avatars/{avatar_id}/status", timeout=10)
if resp.status_code == 404:
return {"status": "not_found"}
return resp.json()
def create_musetalk_session(avatar_id: str) -> dict:
resp = requests.post(
f"{MUSETALK_URL}/sessions",
data={"avatar_id": avatar_id},
timeout=10,
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to create session: {resp.status_code} {resp.text}")
return resp.json()
def close_musetalk_session(session_id: str):
requests.delete(f"{MUSETALK_URL}/sessions/{session_id}", timeout=10)
# ================================================================== #
# FastAPI App
# ================================================================== #
app = FastAPI()
@app.get("/")
async def index():
html_path = os.path.join(os.path.dirname(__file__), "index.html")
with open(html_path, "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
@app.get("/avatars")
async def list_avatars():
return {aid: {"name": info["name"]} for aid, info in AVATARS.items()}
@app.get("/llm_config")
async def get_llm_config():
return {
"api_key": llm_config["api_key"],
"base_url": llm_config["base_url"],
"model": llm_config["model"],
"system_prompt": llm_config["system_prompt"],
}
@app.post("/llm_config")
async def update_llm_config(req: dict):
global llm_client
changed = []
if "system_prompt" in req:
llm_config["system_prompt"] = req["system_prompt"]
changed.append("system_prompt")
if "model" in req:
llm_config["model"] = req["model"]
changed.append("model")
if "api_key" in req:
llm_config["api_key"] = req["api_key"]
llm_client = OpenAI(api_key=req["api_key"], base_url=llm_config["base_url"])
changed.append("api_key")
if "base_url" in req:
llm_config["base_url"] = req["base_url"]
llm_client = OpenAI(api_key=llm_config["api_key"], base_url=req["base_url"])
changed.append("base_url")
logger.info("LLM config updated: %s", changed)
return {"updated": changed}
async def ensure_avatar_ready(avatar_id: str, send_status):
avatar_info = AVATARS[avatar_id]
loop = asyncio.get_event_loop()
status = await loop.run_in_executor(None, get_avatar_status, avatar_id)
if status.get("status") == "not_found":
await send_status(f"正在上传形象「{avatar_info['name']}」...")
await loop.run_in_executor(None, upload_avatar, avatar_info["video"], avatar_id)
status = {"status": "processing"}
if status.get("status") == "processing":
await send_status(f"正在预处理「{avatar_info['name']}」,请耐心等待...")
while True:
await asyncio.sleep(3)
status = await loop.run_in_executor(None, get_avatar_status, avatar_id)
s = status.get("status")
if s == "ready":
break
elif s == "failed":
raise RuntimeError(f"形象预处理失败: {status.get('error', 'unknown')}")
await send_status(f"预处理中... (状态: {s})")
return True
async def start_musetalk_session(avatar_id: str, send_status):
loop = asyncio.get_event_loop()
await send_status("正在创建会话...")
session_info = await loop.run_in_executor(None, create_musetalk_session, avatar_id)
session_id = session_info["session_id"]
logger.info("MuseTalk session created: %s (avatar=%s)", session_id, avatar_id)
ws_url = f"{MUSETALK_WS_URL}/ws/sessions/{session_id}"
ws = await websockets.connect(ws_url, ping_interval=None)
await ws.send(json.dumps({"type": "config", "audio_format": "pcm"}))
await ws.send(json.dumps({"type": "start"}))
return session_id, ws
async def cleanup_session(session_id, musetalk_ws):
if musetalk_ws:
try:
await musetalk_ws.close()
except Exception:
pass
if session_id:
try:
await asyncio.get_event_loop().run_in_executor(None, close_musetalk_session, session_id)
except Exception:
pass
# ================================================================== #
# WebSocket Endpoint
# ================================================================== #
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
logger.info("Frontend WebSocket connected")
chat_history: list[dict] = []
session_id = None
musetalk_ws = None
relay_task = None
current_avatar_id = None
ws_send_lock = asyncio.Lock()
# Audio-video sync
audio_sync_buffer = bytearray()
BYTES_PER_FRAME = 1280 # 640 samples * 2 bytes (16kHz/25fps)
sync_mode = False # False=fast mode, True=sync mode
# ASR state
asr_audio_queue = None
asr_text_queue = None
asr_thread = None
asr_poll_task = None
ws_closed = False
async def ws_send_text(data: str):
if ws_closed:
return
try:
async with ws_send_lock:
await websocket.send_text(data)
except Exception:
pass
async def ws_send_bytes(data: bytes):
if ws_closed:
return
try:
async with ws_send_lock:
await websocket.send_bytes(data)
except Exception:
pass
async def send_status(msg: str):
await ws_send_text(json.dumps({"type": "status", "message": msg}))
async def start_relay():
"""Relay video frames from MuseTalk to frontend."""
nonlocal audio_sync_buffer
try:
async for message in musetalk_ws:
if not isinstance(message, bytes) or len(message) < 2:
continue
tag = message[0]
jpeg_data = message[1:]
if sync_mode and tag == 0x01 and len(audio_sync_buffer) >= BYTES_PER_FRAME:
# SYNC mode: pair lip-sync frame with audio slice
audio_slice = bytes(audio_sync_buffer[:BYTES_PER_FRAME])
del audio_sync_buffer[:BYTES_PER_FRAME]
packet = b"SYNC" + len(audio_slice).to_bytes(4, "big") + audio_slice + jpeg_data
await ws_send_bytes(packet)
else:
# Fast mode or idle frame: just forward JPEG
await ws_send_bytes(jpeg_data)
except (websockets.ConnectionClosed, Exception):
logger.debug("Frame relay ended")
async def switch_avatar(avatar_id: str):
nonlocal session_id, musetalk_ws, relay_task, current_avatar_id
if relay_task and not relay_task.done():
relay_task.cancel()
await cleanup_session(session_id, musetalk_ws)
session_id = None
musetalk_ws = None
await ensure_avatar_ready(avatar_id, send_status)
session_id, musetalk_ws = await start_musetalk_session(avatar_id, send_status)
current_avatar_id = avatar_id
relay_task = asyncio.create_task(start_relay())
avatar_name = AVATARS[avatar_id]["name"]
await send_status(f"「{avatar_name}」已就绪,可以开始对话!")
await ws_send_text(json.dumps({
"type": "ready",
"avatar_id": avatar_id,
"avatar_name": avatar_name,
}))
async def handle_chat(user_text: str):
nonlocal audio_sync_buffer, chat_history
t_start = time.time()
logger.info("[TIMING] User input: %s", user_text)
await send_status("正在回复...")
loop = asyncio.get_event_loop()
audio_queue: asyncio.Queue[bytes | None] = asyncio.Queue()
text_queue: asyncio.Queue[str | None] = asyncio.Queue()
future = loop.run_in_executor(
None, stream_llm_and_tts,
user_text, chat_history, loop, audio_queue, text_queue, t_start,
)
# Stream text chunks to frontend
async def _stream_text():
while True:
text_chunk = await text_queue.get()
if text_chunk is None:
break
await ws_send_text(json.dumps({"type": "reply_chunk", "content": text_chunk}))
await ws_send_text(json.dumps({"type": "reply_end"}))
text_task = asyncio.create_task(_stream_text())
# Forward audio to MuseTalk
chunk_count = 0
first_audio_sent = True
while True:
chunk = await audio_queue.get()
if chunk is None:
break
if first_audio_sent:
logger.info("[TIMING] First audio to MuseTalk: %.0fms", (time.time() - t_start) * 1000)
first_audio_sent = False
# Send to MuseTalk for lip-sync generation
await musetalk_ws.send(chunk)
if sync_mode:
# SYNC mode: buffer audio, relay task will pair with frames
audio_sync_buffer.extend(chunk)
else:
# Fast mode: send audio directly to frontend
await ws_send_bytes(b"AUDIO" + chunk)
chunk_count += 1
reply = await future
await text_task
logger.info("[TIMING] All done: %.0fms | chunks=%d", (time.time() - t_start) * 1000, chunk_count)
# Tell frontend all audio has been sent, safe to start checking playback completion
await ws_send_text(json.dumps({"type": "audio_done"}))
chat_history.append({"role": "user", "content": user_text})
chat_history.append({"role": "assistant", "content": reply})
if len(chat_history) > 20:
chat_history = chat_history[-20:]
await send_status("就绪")
async def _poll_asr():
nonlocal asr_audio_queue, asr_text_queue, asr_thread, asr_poll_task
while not ws_closed:
try:
q = asr_text_queue
if q is None:
await asyncio.sleep(0.1)
continue
result = await asyncio.wait_for(q.get(), timeout=0.1)
text = result.get("text", "")
is_final = result.get("is_final", False)
await ws_send_text(json.dumps({
"type": "asr_result",
"text": text,
"is_final": is_final,
}))
if result.get("done") and text.strip():
logger.info("ASR final: %s", text)
if asr_audio_queue:
asr_audio_queue.put_nowait(None)
asr_audio_queue = None
asr_text_queue = None
await ws_send_text(json.dumps({"type": "asr_committed", "text": text}))
await handle_chat(text)
logger.info("Chat done, waiting for frontend to signal resume")
elif result.get("done") and not text.strip():
if asr_audio_queue:
asr_audio_queue.put_nowait(None)
asr_audio_queue = None
await asyncio.sleep(0.3)
asr_audio_queue = asyncio.Queue()
asr_text_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
asr_thread = threading.Thread(
target=run_asr_session,
args=(loop, asr_audio_queue, asr_text_queue),
daemon=True,
)
asr_thread.start()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
return
except Exception:
logger.exception("ASR poll error")
break
try:
await switch_avatar(DEFAULT_AVATAR_ID)
while True:
message = await websocket.receive()
# Text message: JSON commands
if "text" in message:
msg = json.loads(message["text"])
msg_type = msg.get("type")
if msg_type == "switch_avatar":
avatar_id = msg.get("avatar_id")
if avatar_id in AVATARS and avatar_id != current_avatar_id:
logger.info("Switching avatar to: %s", avatar_id)
chat_history.clear()
await switch_avatar(avatar_id)
elif msg_type == "chat":
await handle_chat(msg["text"])
elif msg_type == "asr_start":
# Start continuous ASR listening
asr_audio_queue = asyncio.Queue()
asr_text_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
asr_thread = threading.Thread(
target=run_asr_session,
args=(loop, asr_audio_queue, asr_text_queue),
daemon=True,
)
asr_thread.start()
logger.info("ASR session started")
# Start or restart poll task
if asr_poll_task and not asr_poll_task.done():
asr_poll_task.cancel()
asr_poll_task = asyncio.create_task(_poll_asr())
elif msg_type == "set_sync_mode":
sync_mode = msg.get("enabled", False)
logger.info("Sync mode: %s", "ON" if sync_mode else "OFF")
elif msg_type == "resume_listening":
# Frontend audio playback finished, safe to restart ASR
if asr_audio_queue is None:
asr_audio_queue = asyncio.Queue()
asr_text_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
asr_thread = threading.Thread(
target=run_asr_session,
args=(loop, asr_audio_queue, asr_text_queue),
daemon=True,
)
asr_thread.start()
# Restart poll task too
if asr_poll_task and not asr_poll_task.done():
asr_poll_task.cancel()
asr_poll_task = asyncio.create_task(_poll_asr())
await ws_send_text(json.dumps({"type": "asr_restarted"}))
logger.info("ASR restarted (frontend audio finished)")
elif msg_type == "asr_stop":
if asr_audio_queue:
asr_audio_queue.put_nowait(None)
asr_audio_queue = None
logger.info("ASR stopped")
# Binary message: ASR audio data from microphone
elif "bytes" in message:
if asr_audio_queue:
asr_audio_queue.put_nowait(message["bytes"])
except WebSocketDisconnect:
logger.info("Frontend disconnected")
except Exception:
logger.exception("WebSocket error")
finally:
ws_closed = True
# Stop ASR
if asr_audio_queue:
asr_audio_queue.put_nowait(None)
if asr_poll_task and not asr_poll_task.done():
asr_poll_task.cancel()
# Stop relay
if relay_task and not relay_task.done():
relay_task.cancel()
await cleanup_session(session_id, musetalk_ws)
logger.info("Session cleaned up")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8008, log_level="info")
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数字人前台 - 测试</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
background: #1a1a2e;
color: #eee;
height: 100vh;
display: flex;
flex-direction: column;
}
/* Header */
.header {
background: #16213e;
padding: 12px 24px;
display: flex;
align-items: center;
justify-content: space-between;
border-bottom: 1px solid #0f3460;
}
.header h1 { font-size: 18px; color: #e94560; }
.header-right {
display: flex;
align-items: center;
gap: 12px;
}
.avatar-selector {
display: flex;
gap: 6px;
}
.avatar-btn {
padding: 5px 14px;
border: 1px solid #0f3460;
border-radius: 6px;
background: #1a1a2e;
color: #aaa;
font-size: 13px;
cursor: pointer;
}
.avatar-btn:hover { border-color: #e94560; color: #eee; }
.avatar-btn.active {
background: #e94560;
border-color: #e94560;
color: #fff;
}
.avatar-btn:disabled { opacity: 0.5; cursor: not-allowed; }
.status-bar {
font-size: 13px;
color: #aaa;
padding: 4px 12px;
background: #0f3460;
border-radius: 12px;
}
.status-bar.ready { color: #4ecca3; }
/* Main Content */
.main {
flex: 1;
display: flex;
overflow: hidden;
}
/* Video Area */
.video-area {
flex: 1;
display: flex;
align-items: center;
justify-content: center;
background: #0f0f23;
position: relative;
}
#avatar-video {
max-width: 100%;
max-height: 100%;
border-radius: 8px;
}
.video-placeholder {
color: #555;
font-size: 16px;
text-align: center;
}
.fps-counter {
position: absolute;
top: 8px;
right: 8px;
font-size: 12px;
color: #4ecca3;
background: rgba(0,0,0,0.6);
padding: 2px 8px;
border-radius: 4px;
}
/* Chat Area */
.chat-area {
width: 400px;
display: flex;
flex-direction: column;
background: #16213e;
border-left: 1px solid #0f3460;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 16px;
}
.message {
margin-bottom: 12px;
max-width: 90%;
}
.message.user {
margin-left: auto;
text-align: right;
}
.message .bubble {
display: inline-block;
padding: 8px 14px;
border-radius: 12px;
font-size: 14px;
line-height: 1.5;
word-break: break-word;
}
.message.user .bubble {
background: #e94560;
color: #fff;
border-bottom-right-radius: 4px;
}
.message.assistant .bubble {
background: #0f3460;
color: #eee;
border-bottom-left-radius: 4px;
}
.message .label {
font-size: 11px;
color: #777;
margin-bottom: 4px;
}
/* Input Area */
.input-area {
padding: 12px 16px;
border-top: 1px solid #0f3460;
display: flex;
gap: 8px;
}
.input-area input {
flex: 1;
padding: 10px 14px;
border: 1px solid #0f3460;
border-radius: 8px;
background: #1a1a2e;
color: #eee;
font-size: 14px;
outline: none;
}
.input-area input:focus {
border-color: #e94560;
}
.input-area input:disabled {
opacity: 0.5;
}
.input-area button {
padding: 10px 20px;
border: none;
border-radius: 8px;
background: #e94560;
color: #fff;
font-size: 14px;
cursor: pointer;
}
.input-area button:hover { background: #c73550; }
.input-area button:disabled { opacity: 0.5; cursor: not-allowed; }
.mic-btn {
padding: 10px 14px;
border: 2px solid #0f3460;
border-radius: 8px;
background: #1a1a2e;
color: #aaa;
font-size: 16px;
cursor: pointer;
}
.mic-btn:hover { border-color: #e94560; color: #eee; }
.mic-btn.recording {
background: #e94560;
border-color: #e94560;
color: #fff;
animation: pulse 1s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.6; }
}
.asr-preview {
font-size: 12px;
color: #888;
padding: 4px 14px;
min-height: 18px;
}
/* Settings Panel */
.settings-toggle {
padding: 5px 12px;
border: 1px solid #0f3460;
border-radius: 6px;
background: #1a1a2e;
color: #aaa;
font-size: 13px;
cursor: pointer;
}
.settings-toggle:hover { border-color: #e94560; color: #eee; }
.settings-panel {
display: none;
position: absolute;
top: 50px;
right: 12px;
background: #16213e;
border: 1px solid #0f3460;
border-radius: 8px;
padding: 16px;
width: 380px;
z-index: 100;
box-shadow: 0 4px 20px rgba(0,0,0,0.5);
}
.settings-panel.open { display: block; }
.settings-panel label {
display: block;
font-size: 12px;
color: #aaa;
margin-top: 10px;
margin-bottom: 4px;
}
.settings-panel label:first-child { margin-top: 0; }
.settings-panel input, .settings-panel textarea {
width: 100%;
padding: 6px 10px;
border: 1px solid #0f3460;
border-radius: 6px;
background: #1a1a2e;
color: #eee;
font-size: 13px;
font-family: inherit;
}
.settings-panel textarea {
height: 80px;
resize: vertical;
}
.settings-panel input:focus, .settings-panel textarea:focus {
border-color: #e94560;
outline: none;
}
.settings-save {
margin-top: 12px;
padding: 6px 16px;
border: none;
border-radius: 6px;
background: #e94560;
color: #fff;
font-size: 13px;
cursor: pointer;
}
.settings-save:hover { background: #c73550; }
.settings-msg {
font-size: 12px;
color: #4ecca3;
margin-left: 10px;
}
</style>
</head>
<body>
<div class="header">
<h1>数字人前台</h1>
<div class="header-right">
<div class="avatar-selector" id="avatar-selector"></div>
<button class="avatar-btn" id="mode-btn" onclick="toggleSyncMode()">快速模式</button>
<button class="settings-toggle" onclick="toggleSettings()">设置</button>
<div class="status-bar" id="status">连接中...</div>
</div>
<div class="settings-panel" id="settings-panel">
<label>系统提示词</label>
<textarea id="cfg-prompt"></textarea>
<label>模型名称</label>
<input type="text" id="cfg-model" />
<label>API Base URL</label>
<input type="text" id="cfg-base-url" />
<label>API Key</label>
<input type="password" id="cfg-api-key" />
<div style="margin-top:12px">
<button class="settings-save" onclick="saveSettings()">保存</button>
<span class="settings-msg" id="settings-msg"></span>
</div>
</div>
</div>
<div class="main">
<div class="video-area">
<img id="avatar-video" style="display:none;" />
<div class="video-placeholder" id="placeholder">等待数字人加载...</div>
<div class="fps-counter" id="fps">0 fps</div>
</div>
<div class="chat-area">
<div class="chat-messages" id="messages"></div>
<div class="asr-preview" id="asr-preview"></div>
<div class="input-area">
<button class="mic-btn" id="mic-btn" onclick="toggleMic()" title="语音输入">🎤</button>
<input type="text" id="chat-input" placeholder="请输入您的问题..." disabled />
<button id="send-btn" onclick="sendChat()" disabled>发送</button>
</div>
</div>
</div>
<script>
const ws = new WebSocket(`ws://${location.host}/ws`);
const videoEl = document.getElementById('avatar-video');
const placeholder = document.getElementById('placeholder');
const statusEl = document.getElementById('status');
const messagesEl = document.getElementById('messages');
const inputEl = document.getElementById('chat-input');
const sendBtn = document.getElementById('send-btn');
const fpsEl = document.getElementById('fps');
const avatarSelector = document.getElementById('avatar-selector');
let isReady = false;
let isBusy = false;
let frameCount = 0;
let lastFpsTime = Date.now();
let currentAvatarId = null;
// Load avatar list
fetch('/avatars').then(r => r.json()).then(avatars => {
for (const [id, info] of Object.entries(avatars)) {
const btn = document.createElement('button');
btn.className = 'avatar-btn';
btn.textContent = info.name;
btn.dataset.avatarId = id;
btn.onclick = () => switchAvatar(id);
avatarSelector.appendChild(btn);
}
});
function switchAvatar(avatarId) {
if (avatarId === currentAvatarId || isBusy) return;
setInputEnabled(false);
isReady = false;
messagesEl.innerHTML = '';
ws.send(JSON.stringify({ type: 'switch_avatar', avatar_id: avatarId }));
}
function updateAvatarButtons(activeId) {
currentAvatarId = activeId;
document.querySelectorAll('.avatar-btn').forEach(btn => {
btn.classList.toggle('active', btn.dataset.avatarId === activeId);
});
}
// FPS counter
setInterval(() => {
const now = Date.now();
const elapsed = (now - lastFpsTime) / 1000;
const fps = Math.round(frameCount / elapsed);
fpsEl.textContent = `${fps} fps`;
frameCount = 0;
lastFpsTime = now;
}, 1000);
ws.binaryType = 'arraybuffer';
ws.onopen = () => {
updateStatus('已连接,正在初始化...');
};
ws.onclose = () => {
updateStatus('连接已断开');
setInputEnabled(false);
};
ws.onerror = () => {
updateStatus('连接错误');
};
// Audio playback - independent continuous stream (smooth, no jitter)
let audioCtx = null;
let audioPlaybackTime = 0;
let audioPlaying = false;
let audioCheckTimer = null;
let audioBufferQueue = [];
let audioStarted = false;
let resumeSent = false;
let hasAudioPending = false; // audio chunks received, waiting for first frame
function ensureAudioCtx() {
if (!audioCtx) {
audioCtx = new AudioContext({ sampleRate: 16000 });
}
if (audioCtx.state === 'suspended') {
audioCtx.resume();
}
}
function scheduleAudioChunk(pcmData) {
ensureAudioCtx();
const alignedBuf = new ArrayBuffer(pcmData.byteLength);
new Uint8Array(alignedBuf).set(pcmData);
const int16 = new Int16Array(alignedBuf);
const float32 = new Float32Array(int16.length);
for (let i = 0; i < int16.length; i++) {
float32[i] = int16[i] / 32768.0;
}
const audioBuffer = audioCtx.createBuffer(1, float32.length, 16000);
audioBuffer.getChannelData(0).set(float32);
const source = audioCtx.createBufferSource();
source.buffer = audioBuffer;
source.connect(audioCtx.destination);
const now = audioCtx.currentTime;
const startTime = Math.max(now, audioPlaybackTime);
source.start(startTime);
audioPlaybackTime = startTime + audioBuffer.duration;
}
function onAudioChunk(pcmData) {
audioPlaying = true;
if (audioStarted) {
scheduleAudioChunk(pcmData);
} else {
audioBufferQueue.push(pcmData);
hasAudioPending = true;
}
}
const AUDIO_SYNC_DELAY = 0.3; // seconds - wait for video buffer to fill before playing audio
function triggerAudioStart() {
if (audioStarted || !hasAudioPending) return;
audioStarted = true;
hasAudioPending = false;
// Delay audio start to let video frames accumulate
ensureAudioCtx();
audioPlaybackTime = audioCtx.currentTime + AUDIO_SYNC_DELAY;
for (const chunk of audioBufferQueue) {
scheduleAudioChunk(chunk);
}
audioBufferQueue = [];
}
function resetAudioPlayback() {
audioPlaybackTime = 0;
audioPlaying = false;
audioStarted = false;
audioBufferQueue = [];
hasAudioPending = false;
resumeSent = false;
}
function checkAudioFinished() {
if (audioCheckTimer) return;
resumeSent = false;
audioCheckTimer = setInterval(() => {
if (audioCtx && audioCtx.currentTime >= audioPlaybackTime && audioPlaybackTime > 0) {
audioPlaying = false;
clearInterval(audioCheckTimer);
audioCheckTimer = null;
if (isListening && !resumeSent) {
resumeSent = true;
ws.send(JSON.stringify({ type: 'resume_listening' }));
}
}
}, 100);
}
// ---- SYNC mode: jitter buffer playback ----
let isSyncMode = false;
let syncBuffer = []; // Array of {audio: Uint8Array, jpeg: Uint8Array}
let syncPlaying = false;
let syncTimer = null;
const SYNC_BUFFER_SIZE = 8; // buffer 8 frames (~320ms) before starting
function toggleSyncMode() {
isSyncMode = !isSyncMode;
const btn = document.getElementById('mode-btn');
btn.textContent = isSyncMode ? '同步模式' : '快速模式';
btn.classList.toggle('active', isSyncMode);
ws.send(JSON.stringify({ type: 'set_sync_mode', enabled: isSyncMode }));
// Reset state
syncBuffer = [];
if (syncTimer) { clearInterval(syncTimer); syncTimer = null; }
syncPlaying = false;
resetAudioPlayback();
}
function onSyncFrame(audio, jpeg) {
syncBuffer.push({ audio, jpeg });
if (!syncPlaying && syncBuffer.length >= SYNC_BUFFER_SIZE) {
startSyncPlayback();
}
}
function startSyncPlayback() {
if (syncPlaying) return;
syncPlaying = true;
ensureAudioCtx();
audioPlaybackTime = audioCtx.currentTime;
syncTimer = setInterval(() => {
if (syncBuffer.length === 0) {
clearInterval(syncTimer);
syncTimer = null;
syncPlaying = false;
return;
}
const frame = syncBuffer.shift();
displayJpeg(frame.jpeg);
scheduleAudioChunk(frame.audio);
}, 40); // 25fps
}
function stopSyncPlayback() {
syncBuffer = [];
if (syncTimer) { clearInterval(syncTimer); syncTimer = null; }
syncPlaying = false;
}
const SYNC_HEADER = [0x53, 0x59, 0x4E, 0x43]; // "SYNC"
function isSyncMessage(data) {
if (data.byteLength < 8) return false;
const h = new Uint8Array(data, 0, 4);
return h[0]===0x53 && h[1]===0x59 && h[2]===0x4E && h[3]===0x43;
}
const AUDIO_MARKER = [0x41, 0x55, 0x44, 0x49, 0x4F]; // "AUDIO"
function isAudioMessage(data) {
if (data.byteLength < 6) return false;
const h = new Uint8Array(data, 0, 5);
return h[0]===0x41 && h[1]===0x55 && h[2]===0x44 && h[3]===0x49 && h[4]===0x4F;
}
function displayJpeg(jpegData) {
const blob = new Blob([jpegData], { type: 'image/jpeg' });
const url = URL.createObjectURL(blob);
const oldUrl = videoEl.src;
videoEl.src = url;
videoEl.style.display = 'block';
placeholder.style.display = 'none';
if (oldUrl && oldUrl.startsWith('blob:')) {
URL.revokeObjectURL(oldUrl);
}
frameCount++;
}
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
if (isSyncMode && isSyncMessage(event.data)) {
// SYNC mode: paired audio+video frame → buffer for smooth playback
const view = new DataView(event.data);
const audioLen = view.getUint32(4, false);
const audio = new Uint8Array(event.data, 8, audioLen);
const jpeg = new Uint8Array(event.data, 8 + audioLen);
onSyncFrame(audio, jpeg);
} else if (isAudioMessage(event.data)) {
// Fast mode: audio chunk → buffer or play
const pcm = new Uint8Array(event.data, 5);
onAudioChunk(pcm);
} else {
// JPEG frame (idle or fast mode)
displayJpeg(new Uint8Array(event.data));
if (!isSyncMode && hasAudioPending) triggerAudioStart();
}
return;
}
const msg = JSON.parse(event.data);
switch (msg.type) {
case 'status':
updateStatus(msg.message);
if (msg.message === '就绪') {
isBusy = false;
setInputEnabled(true);
}
break;
case 'ready':
isReady = true;
isBusy = false;
setInputEnabled(true);
updateStatus('就绪', true);
if (msg.avatar_id) updateAvatarButtons(msg.avatar_id);
break;
case 'reply_chunk':
appendReplyChunk(msg.content);
break;
case 'reply_end':
finalizeReply();
break;
case 'asr_result':
if (msg.text) updateAsrBubble(msg.text);
break;
case 'asr_committed':
finalizeAsrBubble(msg.text);
asrPreview.textContent = '正在回复...';
ensureAudioCtx();
isBusy = true;
setInputEnabled(false);
resetAudioPlayback();
break;
case 'asr_restarted':
if (isListening) asrPreview.textContent = '正在聆听...';
break;
case 'audio_done':
if (isSyncMode) {
// SYNC mode: start playback if buffer has frames but hasn't started
if (!syncPlaying && syncBuffer.length > 0) startSyncPlayback();
// Check when sync playback finishes
const checkSync = setInterval(() => {
if (!syncPlaying && syncBuffer.length === 0) {
clearInterval(checkSync);
// Wait for audio to finish too
checkAudioFinished();
}
}, 100);
} else {
checkAudioFinished();
}
asrPreview.textContent = '播报中...';
break;
}
};
function updateStatus(text, ready) {
statusEl.textContent = text;
if (ready) {
statusEl.classList.add('ready');
} else {
statusEl.classList.remove('ready');
}
}
function setInputEnabled(enabled) {
inputEl.disabled = !enabled;
sendBtn.disabled = !enabled;
if (enabled) inputEl.focus();
}
function addMessage(role, content) {
const div = document.createElement('div');
div.className = `message ${role}`;
div.innerHTML = `
<div class="label">${role === 'user' ? '' : '前台'}</div>
<div class="bubble">${escapeHtml(content)}</div>
`;
messagesEl.appendChild(div);
messagesEl.scrollTop = messagesEl.scrollHeight;
}
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// ASR live transcription bubble
let asrBubbleEl = null;
function updateAsrBubble(text) {
if (!asrBubbleEl) {
const div = document.createElement('div');
div.className = 'message user';
div.innerHTML = `
<div class="label">我</div>
<div class="bubble" style="opacity:0.6"></div>
`;
messagesEl.appendChild(div);
asrBubbleEl = div.querySelector('.bubble');
}
asrBubbleEl.textContent = text;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
function finalizeAsrBubble(text) {
if (asrBubbleEl) {
asrBubbleEl.textContent = text;
asrBubbleEl.style.opacity = '1';
asrBubbleEl = null;
} else {
addMessage('user', text);
}
}
// Streaming reply display
let currentReplyBubble = null;
let currentReplyText = '';
function appendReplyChunk(text) {
if (!currentReplyBubble) {
// Create a new message bubble for streaming
const div = document.createElement('div');
div.className = 'message assistant';
div.innerHTML = `
<div class="label">前台</div>
<div class="bubble"></div>
`;
messagesEl.appendChild(div);
currentReplyBubble = div.querySelector('.bubble');
currentReplyText = '';
}
currentReplyText += text;
currentReplyBubble.textContent = currentReplyText;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
function finalizeReply() {
currentReplyBubble = null;
currentReplyText = '';
}
function sendChat() {
const text = inputEl.value.trim();
if (!text || !isReady || isBusy) return;
ensureAudioCtx();
resetAudioPlayback();
addMessage('user', text);
ws.send(JSON.stringify({ type: 'chat', text: text }));
inputEl.value = '';
isBusy = true;
setInputEnabled(false);
}
// Enter key to send
inputEl.addEventListener('keydown', (e) => {
if (e.key === 'Enter') sendChat();
});
// ---- Settings Panel ----
function toggleSettings() {
document.getElementById('settings-panel').classList.toggle('open');
}
// Load current config on page load
fetch('/llm_config').then(r => r.json()).then(cfg => {
document.getElementById('cfg-prompt').value = cfg.system_prompt || '';
document.getElementById('cfg-model').value = cfg.model || '';
document.getElementById('cfg-base-url').value = cfg.base_url || '';
document.getElementById('cfg-api-key').value = cfg.api_key || '';
});
function saveSettings() {
const body = {};
const prompt = document.getElementById('cfg-prompt').value.trim();
const model = document.getElementById('cfg-model').value.trim();
const baseUrl = document.getElementById('cfg-base-url').value.trim();
const apiKey = document.getElementById('cfg-api-key').value.trim();
if (prompt) body.system_prompt = prompt;
if (model) body.model = model;
if (baseUrl) body.base_url = baseUrl;
if (apiKey) body.api_key = apiKey;
fetch('/llm_config', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
}).then(r => r.json()).then(res => {
const msg = document.getElementById('settings-msg');
msg.textContent = '已保存';
setTimeout(() => msg.textContent = '', 2000);
});
}
// ---- Microphone / ASR (auto-listen mode) ----
const micBtn = document.getElementById('mic-btn');
const asrPreview = document.getElementById('asr-preview');
let isListening = false;
let mediaStream = null;
let micAudioCtx = null;
let micProcessor = null;
let micSource = null;
async function toggleMic() {
if (isListening) {
stopListening();
} else {
await startListening();
}
}
async function startListening() {
try {
ensureAudioCtx();
mediaStream = await navigator.mediaDevices.getUserMedia({
audio: { sampleRate: 16000, channelCount: 1, echoCancellation: true, noiseSuppression: true }
});
micAudioCtx = new AudioContext({ sampleRate: 16000 });
micSource = micAudioCtx.createMediaStreamSource(mediaStream);
micProcessor = micAudioCtx.createScriptProcessor(4096, 1, 1);
micProcessor.onaudioprocess = (e) => {
if (!isListening || isBusy || audioPlaying) return;
const float32 = e.inputBuffer.getChannelData(0);
const int16 = new Int16Array(float32.length);
for (let i = 0; i < float32.length; i++) {
int16[i] = Math.max(-32768, Math.min(32767, Math.round(float32[i] * 32768)));
}
ws.send(int16.buffer);
};
micSource.connect(micProcessor);
micProcessor.connect(micAudioCtx.destination);
// Start ASR on backend
ws.send(JSON.stringify({ type: 'asr_start' }));
isListening = true;
micBtn.classList.add('recording');
micBtn.textContent = '🔴';
asrPreview.textContent = '正在聆听...';
} catch (err) {
console.error('Mic error:', err);
asrPreview.textContent = '麦克风访问失败';
}
}
function stopListening() {
isListening = false;
micBtn.classList.remove('recording');
micBtn.textContent = '🎤';
asrPreview.textContent = '';
if (mediaStream) {
mediaStream.getTracks().forEach(t => t.stop());
mediaStream = null;
}
if (micAudioCtx) {
micAudioCtx.close();
micAudioCtx = null;
}
ws.send(JSON.stringify({ type: 'asr_stop' }));
}
</script>
</body>
</html>
fastapi>=0.104.0
uvicorn>=0.24.0
websockets>=12.0
websocket-client>=1.6.0
openai>=1.0.0
requests>=2.31.0
@echo off
cd /d "%~dp0"
.venv\Scripts\python app.py
pause
"""
Tencent Cloud TTS (Streaming Text-to-Speech v2) and ASR (Real-time Speech Recognition)
"""
import base64
import hashlib
import hmac
import json
import logging
import threading
import time
import uuid
from urllib.parse import quote
import websocket # websocket-client (sync)
logger = logging.getLogger(__name__)
def _generate_signature(params: dict, secret_key: str, host: str, path: str, prefix: str = "GET") -> str:
"""Generate HMAC-SHA1 signature for Tencent Cloud WebSocket API."""
sorted_keys = sorted(params.keys())
param_string = "&".join(f"{k}={params[k]}" for k in sorted_keys)
sign_str = f"{prefix}{host}{path}?{param_string}"
signature = hmac.new(
secret_key.encode(), sign_str.encode(), hashlib.sha1
).digest()
return quote(base64.b64encode(signature).decode())
# ================================================================== #
# Tencent TTS - Streaming Text-to-Speech v2
# ================================================================== #
class TencentStreamingTTS:
"""Tencent Cloud Streaming TTS using WebSocket v2.
Supports streaming text input (sentence by sentence from LLM).
Audio receiving runs in a background thread for non-blocking sends.
"""
def __init__(
self,
app_id: str,
secret_id: str,
secret_key: str,
voice_type: str = "502004",
speed: float = 0,
volume: int = 0,
sample_rate: int = 16000,
codec: str = "pcm",
):
self.app_id = app_id
self.secret_id = secret_id
self.secret_key = secret_key
self.voice_type = voice_type
self.speed = speed
self.volume = volume
self.sample_rate = sample_rate
self.codec = codec
self._ws = None
self._session_id = None
self._on_audio = None
self._recv_thread = None
self._done_event = threading.Event()
def connect(self, on_audio):
"""Connect to TTS WebSocket, start background audio receiver.
Args:
on_audio: callback(pcm_bytes) called for each audio chunk
"""
self._on_audio = on_audio
self._session_id = str(uuid.uuid4())
self._done_event.clear()
timestamp = int(time.time())
params = {
"Action": "TextToStreamAudioWSv2",
"AppId": str(self.app_id),
"Codec": self.codec,
"Expired": str(timestamp + 86400),
"SampleRate": str(self.sample_rate),
"SecretId": self.secret_id,
"SessionId": self._session_id,
"Speed": str(self.speed),
"Timestamp": str(timestamp),
"VoiceType": str(self.voice_type),
"Volume": str(self.volume),
}
sig = _generate_signature(
params, self.secret_key,
"tts.cloud.tencent.com", "/stream_wsv2",
)
query = "&".join(f"{k}={params[k]}" for k in sorted(params.keys()))
url = f"wss://tts.cloud.tencent.com/stream_wsv2?{query}&Signature={sig}"
self._ws = websocket.create_connection(url, timeout=10)
resp = json.loads(self._ws.recv())
if resp.get("code") != 0:
raise RuntimeError(f"TTS connect failed: {resp}")
logger.info("Tencent TTS connected (session=%s)", self._session_id)
# Start background thread to receive audio
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._recv_thread.start()
def _recv_loop(self):
"""Background thread: continuously receive audio and metadata."""
while True:
try:
data = self._ws.recv()
except Exception:
break
if isinstance(data, bytes):
if self._on_audio:
self._on_audio(data)
else:
resp = json.loads(data)
if resp.get("code") != 0:
logger.error("TTS error: %s", resp)
break
if resp.get("final") == 1:
break
self._done_event.set()
def send_text(self, text: str):
"""Send a text chunk for synthesis (non-blocking, returns immediately)."""
if not self._ws:
raise RuntimeError("TTS not connected")
msg = {
"session_id": self._session_id,
"message_id": str(uuid.uuid4()),
"action": "ACTION_SYNTHESIS",
"data": text,
}
self._ws.send(json.dumps(msg))
def complete(self):
"""Signal end of text, wait for all audio, close connection."""
if not self._ws:
return
msg = {
"session_id": self._session_id,
"message_id": str(uuid.uuid4()),
"action": "ACTION_COMPLETE",
"data": "",
}
self._ws.send(json.dumps(msg))
# Wait for background receiver to finish
self._done_event.wait(timeout=30)
if self._recv_thread:
self._recv_thread.join(timeout=5)
self._ws.close()
self._ws = None
logger.info("Tencent TTS completed")
# ================================================================== #
# Tencent ASR - Real-time Speech Recognition
# ================================================================== #
class TencentRealtimeASR:
"""Tencent Cloud Real-time ASR using WebSocket.
Accepts PCM audio chunks and returns transcription text.
"""
def __init__(
self,
app_id: str,
secret_id: str,
secret_key: str,
engine_type: str = "16k_zh_large",
vad_silence_time: int = 800,
):
self.app_id = app_id
self.secret_id = secret_id
self.secret_key = secret_key
self.engine_type = engine_type
self.vad_silence_time = vad_silence_time
self._ws = None
def connect(self) -> str:
"""Connect to ASR WebSocket. Returns session ID on success."""
import random
timestamp = int(time.time())
params = {
"secretid": self.secret_id,
"timestamp": str(timestamp),
"expired": str(timestamp + 86400),
"nonce": str(random.randint(1000000, 9999999)),
"engine_model_type": self.engine_type,
"voice_id": str(uuid.uuid4()),
"voice_format": "1", # PCM
"needvad": "1",
"vad_silence_time": str(self.vad_silence_time),
}
# ASR signing: no "GET" prefix, just host+path+params
sig = _generate_signature(
params, self.secret_key,
"asr.cloud.tencent.com", f"/asr/v2/{self.app_id}",
prefix="",
)
query = "&".join(f"{k}={params[k]}" for k in sorted(params.keys()))
url = f"wss://asr.cloud.tencent.com/asr/v2/{self.app_id}?{query}&signature={sig}"
self._ws = websocket.create_connection(url, timeout=10)
# Handshake response
resp = json.loads(self._ws.recv())
if resp.get("code") != 0:
raise RuntimeError(f"ASR connect failed: {resp}")
logger.info("Tencent ASR connected (voice_id=%s)", params["voice_id"])
return params["voice_id"]
def send_audio(self, pcm_chunk: bytes):
"""Send a PCM audio chunk for recognition."""
if self._ws:
self._ws.send_binary(pcm_chunk)
def recv_result(self) -> dict | None:
"""Non-blocking receive of recognition result. Returns None if no data."""
if not self._ws:
return None
self._ws.settimeout(0.05)
try:
data = self._ws.recv()
if isinstance(data, str):
return json.loads(data)
except websocket.WebSocketTimeoutException:
return None
except Exception:
return None
return None
def send_end(self):
"""Signal end of audio stream."""
if self._ws:
try:
end_msg = json.dumps({"type": "end"})
self._ws.send(end_msg)
except Exception:
pass
def close(self):
"""Close connection and get final result."""
results = []
if self._ws:
self.send_end()
# Drain remaining results
self._ws.settimeout(3)
while True:
try:
data = self._ws.recv()
if isinstance(data, str):
resp = json.loads(data)
results.append(resp)
if resp.get("final") == 1:
break
except Exception:
break
self._ws.close()
self._ws = None
return results
# 数字人测试客户端 - 使用说明
## 前提条件
1. MuseTalk 流式服务已在服务器上启动(`10.10.0.102:8001`
2. 本地已创建好虚拟环境(`.venv` 目录已存在且依赖已安装)
## 启动方式
```bash
cd D:\work\fusion\Avatar\MuseTalk\test_client
.venv\Scripts\python app.py
```
或直接双击 `start.bat`
启动后在浏览器打开:http://localhost:8002
## 使用流程
1. 打开页面后,系统会自动上传数字人视频到服务器并进行预处理(首次需等待)
2. 预处理完成后,页面左侧显示数字人画面,右侧为对话框
3. 在输入框中输入问题,按回车或点击发送
4. 系统流程:用户文字 → DeepSeek 大模型回复 → CosyVoice 语音合成 → MuseTalk 口型驱动
5. 数字人会根据回复内容对口型说话,右侧显示对话记录
## 配置说明
如需修改配置,编辑 `app.py` 顶部的常量:
| 配置项 | 说明 | 默认值 |
|--------|------|--------|
| `MUSETALK_URL` | MuseTalk 服务地址 | `http://10.10.0.102:8001` |
| `DEEPSEEK_API_KEY` | DeepSeek 大模型 API Key | 已配置 |
| `DASHSCOPE_API_KEY` | 千问 TTS API Key | 已配置 |
| `AVATAR_VIDEO_PATH` | 数字人形象视频路径 | `shuzirenxingxiang.mp4` |
| `AVATAR_ID` | 数字人形象 ID | `hospital_front` |
| `SYSTEM_PROMPT` | 大模型系统提示词 | 医院前台角色 |
## 重新安装依赖
如果 `.venv` 损坏或需要重建:
```bash
cd D:\work\fusion\Avatar\MuseTalk\test_client
python -m venv .venv
.venv\Scripts\pip install -r requirements.txt
```
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment