今回は音声処理です。
ラズパイにて音声取り込み、PC上にてSST(Speech to Text)変換、AI(Ollama)にてText処理し、その出力をTTS(Text to Speech)変換 、ラズパイにて音声再生という流れで製作していきます。
レイテンシ1秒程度を目標とします。今回もChatGPTとの共作になります。といっても、ほぼCahtGPTが作ってます。私は、「こうしたいのだけど」、「もうちょっとレスポンス上げたいんだけど」、と言っているだけです。
音声処理の方式
Rapi は基本入出力だけ。重い処理は全部PC。
データフロー
- Raspi→PC(WebSocket)
- Mic Audio(常時)
- PC:音声処理パイプライン
- VAD(発話区間検出)
- STT(Whisper系)
- Ollama(ローカルAI)
- TTS(PC側で音声生成)
- PC→RasPi(HTTP)
- TTS Audio(返答音声)
以下詳細です。
全体構成
[ Raspberry Pi 3B ]
├─ 音声送信(WebSocket)
└─ 音声再生(TTS結果)
↓
[ PC(ローカル) ]
├─ 音声受信サーバ(Python)
│ ├─ STT(Whisper等)
│ ├─ Ollama(LLM)
│ └─ TTS(音声生成)
└─ 応答音声を Raspi へ送信
音声通信方針
通信方式
| 項目 | 採用 |
|---|---|
| 音声送信 | WebSocket |
| 通話形態 | 半二重 |
| 音声単位 | 発話ごと(VADで区切る) |
| 圧縮 | WAV or PCM(最初は非圧縮) |
コンポーネント分割(PC側)
PC側は「2プロセス構成」にする
① 音声会話サーバ(Python)
役割
- Raspi から音声を受信
- STT → Ollama → TTS を直列処理
- 応答音声を Raspi に返送
audio_server.py
├─ WebSocket サーバ
├─ STT 処理
├─ Ollama API 呼び出し
└─ TTS 処理
音声処理パイプライン(詳細)
① Raspi → PC(音声送信)
- Raspi マイクで録音
- 音声データを WebSocket で送信
[Raspi]
音声入力
↓
PCM
↓
WebSocket送信
② PC側:STT(Speech To Text)
- Whisper(faster-whisper)
音声データ
↓
Whisper
↓
テキスト(日本語)
③ Ollama 連携(LLM)
Ollama の使い方
Ollama は ローカルHTTP API を提供しています。
http://localhost:11434/api/chat
入力
{
"model": "gemma3",
"messages": [
{"role": "system", "content": "あなたはロボットの対話AIです"},
{"role": "user", "content": "今の発話テキスト"}
]
}
出力
{
"message": {
"role": "assistant",
"content": "応答テキスト"
}
}
④ TTS(Text To Speech)
応答テキスト
↓
TTS
↓
WAV音声
⑤ PC → Raspi(音声返信)
- HTTPで WAV 音声を返送
- Raspi 側で即再生
[PC]
WAV音声
↓
HTTP
↓
[Raspi]
speaker再生
レイテンシ見積(見込み)
| 処理 | 目安 |
|---|---|
| 音声送信 | 50ms |
| STT | 300〜600ms |
| Ollama | 200〜500ms |
| TTS | 200〜400ms |
| 音声返送 | 50ms |
| 合計 | 0.8〜1.5秒 |
👉 「話しかけたらすぐ返る」体感は十分可能との見込みだったが、実際はこの3倍ほどかかっている。PCが、Core Ultra 125H CPUのため、正直CPU処理だけでは厳しい。将来的にはGPUへのオフロードも検討する。
Raspi側の処理
- USB オーディオ:Sound Blaster Play! 3 採用(家にあったので)
- ALSA デバイス確認:ソースコードのカードID, デバイスIDを一致させる。
arecord -l
例:
card 2: S3 [Sound Blaster Play! 3], device 0: USB Audio
必要パッケージ(Raspi)
sudo apt update
sudo apt install -y portaudio19-dev
sudo apt install -y alsa-utils
python3 -m venv venv_audio
source venv_audio/bin/activate
pip install sounddevice numpy websockets
pip install fastapi uvicorn httpx
デバイス確認(重要)
python - << 'EOF'
import sounddevice as sd
print(sd.query_devices())
EOF
Sound Blaster Play! 3がPythonからも 入力デバイスとして見えること- 例:
device=1
録音テスト(単体テストとして最重要チェックポイントです)
import sounddevice as sd
import numpy as np
DEVICE = 1
SR = 48000
DUR = 2
x = sd.rec(int(SR*DUR), samplerate=SR, channels=1, dtype="int16", device=DEVICE)
sd.wait()
print("rms=", np.sqrt(np.mean(x.astype(np.float32)**2)))
判定基準
rms > 0→ 音声取得OKrms ≈ 0→ 入力デバイス or マイク未認識
Raspi 側のエラーハンドリング
| エラー | 原因 | 対処 |
|---|---|---|
PortAudio library not found | OS側にPortAudioなし | apt install portaudio19-dev |
Invalid sample rate | 16kHz非対応 | 48kHzに変更 |
| デバイスが見えない | 権限/接続 | audio グループ追加、再起動 |
| 音が取れない | channels不一致 | channels=2 → mono化 |
PC 側の処理
実行環境
- Windows PowerShell(推奨)
- Python 仮想環境を音声用に分離
PowerShell を管理者権限で起動(最初だけ)
Set-ExecutionPolicy -Scope CurrentUser -ExecutionPolicy RemoteSigned
python -m venv venv_stt
.\venv_stt\Scripts\Activate.ps1
pip install websockets numpy faster-whisper
pip install pyttsx3 httpx
実行手順
PC側
venv_stt/script/activate.ps1
python pc_ws_stt_wevrtcvad_ollama_tts_raspi.py
Raspi側
source venv_audio/bin/activate
python raspi_audio_sender_arecord.py
別ターミナルにて
source venv_audio/bin/activate
uvicorn speak_server:app --host 0.0.0.0 --port 5005
*)port 5005はPC側と一致させる
各ソースコードです。
音声送信(Raspi->PC) :raspi_audio_sender_arecord.py
#!/usr/bin/env python3
import asyncio
import json
import signal
import subprocess
from dataclasses import dataclass
from typing import Optional
import numpy as np
import websockets
# =========================
# 設定
# =========================
WS_URL = "ws://192.168.0.26:8765" # ★PCのIPに変更
#ARECORD_DEVICE = "hw:1,0" # ★arecord -l の card,device番号に合わせる
ARECORD_DEVICE = "hw:0,0" # ★arecord -l の card,device番号に合わせる
SAMPLE_RATE = 48000
ARECORD_CHANNELS = 2 # Play! 3 は 2ch固定になりやすい
SEND_CHANNELS = 1 # 送信はmono
FORMAT = "S16_LE"
FRAME_MS = 20
RECONNECT_SEC = 1.0
OPEN_TIMEOUT_SEC = 30
PING_INTERVAL_SEC = 20
PING_TIMEOUT_SEC = 20
READ_CHUNK_BYTES = 4096
# 送信キュー(ネット詰まり吸収)
SEND_QUEUE_MAX = 200 # 最大4秒ぶん(20ms×200)
# =========================
BYTES_PER_SAMPLE = 2 # int16
FRAME_SAMPLES = int(SAMPLE_RATE * FRAME_MS / 1000) # 960
ARECORD_FRAME_BYTES = FRAME_SAMPLES * ARECORD_CHANNELS * BYTES_PER_SAMPLE # 3840
@dataclass
class ArecordProc:
proc: subprocess.Popen
def build_arecord_cmd() -> list[str]:
return [
"arecord",
"-D", ARECORD_DEVICE,
"-f", FORMAT,
"-c", str(ARECORD_CHANNELS),
"-r", str(SAMPLE_RATE),
"-t", "raw",
]
def start_arecord() -> ArecordProc:
cmd = build_arecord_cmd()
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=0,
)
if proc.stdout is None:
proc.terminate()
raise RuntimeError("Failed to open arecord stdout")
return ArecordProc(proc=proc)
def stop_arecord(p: Optional[ArecordProc]) -> None:
if p is None:
return
proc = p.proc
if proc.poll() is None:
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
def stereo_to_mono_int16(frame_bytes: bytes) -> bytes:
pcm = np.frombuffer(frame_bytes, dtype=np.int16).reshape(-1, 2)
mono = ((pcm[:, 0].astype(np.int32) + pcm[:, 1].astype(np.int32)) // 2).astype(np.int16)
return mono.tobytes()
async def ws_sender(ws, send_q: asyncio.Queue[bytes], stop_event: asyncio.Event):
"""送信専用タスク"""
while not stop_event.is_set():
data = await send_q.get()
try:
await ws.send(data)
finally:
send_q.task_done()
async def run_one_connection(stop_event: asyncio.Event):
"""
1回のWS接続セッション
- arecord stdout read は to_thread で非ブロッキング化
- 送信は別タスク ws_sender が担当
"""
arec: Optional[ArecordProc] = None
send_q: asyncio.Queue[bytes] = asyncio.Queue(maxsize=SEND_QUEUE_MAX)
async with websockets.connect(
WS_URL,
max_size=None,
open_timeout=OPEN_TIMEOUT_SEC,
ping_interval=PING_INTERVAL_SEC,
ping_timeout=PING_TIMEOUT_SEC,
close_timeout=3,
) as ws:
# meta送信
meta = {
"type": "audio_meta",
"sample_rate": SAMPLE_RATE,
"channels": SEND_CHANNELS,
"dtype": "int16",
"frame_ms": FRAME_MS,
}
await ws.send(json.dumps(meta))
print(f"✅ WS connected -> {WS_URL}")
print(f"🎙️ arecord: device={ARECORD_DEVICE}, sr={SAMPLE_RATE}, ch={ARECORD_CHANNELS}, frame={FRAME_MS}ms")
sender_task = asyncio.create_task(ws_sender(ws, send_q, stop_event))
try:
arec = start_arecord()
buf = bytearray()
while not stop_event.is_set():
# ★重要:ブロッキングreadをスレッドで実行してイベントループを止めない
chunk = await asyncio.to_thread(arec.proc.stdout.read, READ_CHUNK_BYTES)
if not chunk:
raise RuntimeError("arecord stream ended (EOF)")
buf.extend(chunk)
while len(buf) >= ARECORD_FRAME_BYTES:
frame = bytes(buf[:ARECORD_FRAME_BYTES])
del buf[:ARECORD_FRAME_BYTES]
mono_bytes = stereo_to_mono_int16(frame)
# キュー満杯なら古い音声を捨てて最新優先(遅延が積み上がらない)
if send_q.full():
try:
_ = send_q.get_nowait()
send_q.task_done()
except asyncio.QueueEmpty:
pass
await send_q.put(mono_bytes)
finally:
stop_arecord(arec)
arec = None
sender_task.cancel()
# キャンセル完了待ち(例外握りつぶし)
try:
await sender_task
except Exception:
pass
async def main():
stop_event = asyncio.Event()
def _stop():
stop_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _stop)
except NotImplementedError:
pass
print("Starting Raspberry Pi audio sender (arecord -> buffer -> mono -> WS queue sender)")
print(f"WS_URL={WS_URL}")
print("Press Ctrl+C to stop.")
while not stop_event.is_set():
try:
await run_one_connection(stop_event)
except Exception as e:
print(f"⚠️ WS session error: {e}")
if not stop_event.is_set():
print(f"🔁 Reconnecting in {RECONNECT_SEC:.1f}s ...")
await asyncio.sleep(RECONNECT_SEC)
print("Stopped.")
if __name__ == "__main__":
asyncio.run(main())
VAD=>SST=>Olama=>TTS変換(PC):pc_ws_stt_wevrtcvad_ollama_tts_raspi.py
import asyncio
import json
import time
import audioop
from collections import deque
from dataclasses import dataclass
from typing import Deque, Optional
import numpy as np
import websockets
import webrtcvad
from faster_whisper import WhisperModel
import os
import wave
from datetime import datetime
import re
import httpx
import subprocess
from datetime import datetime
import pyttsx3
import tempfile
RASPI_SPEAK_URL = "http://192.168.0.12:5005/speak"
RASPI_PLAY_WAV_URL = "http://192.168.0.12:5005/play_wav"
#Raspi通信
async def speak_on_raspi(text: str) -> None:
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(
RASPI_SPEAK_URL,
json={"text": text},
headers={"Content-Type": "application/json"},
)
r.raise_for_status()
def sapi_text_to_wav_bytes(text: str, rate: int | None = None, voice_name_contains: str | None = None) -> bytes:
"""
Windowsローカル(SAPI5)で text→WAV を生成してバイト列で返す
- rate: 話速(例: 180〜220) Noneなら既定
- voice_name_contains: "Haruka" など、名前に含まれる文字で声を選ぶ(任意)
"""
engine = pyttsx3.init("sapi5")
if rate is not None:
engine.setProperty("rate", rate)
if voice_name_contains:
for v in engine.getProperty("voices"):
name = getattr(v, "name", "")
if voice_name_contains.lower() in name.lower():
engine.setProperty("voice", v.id)
break
fd, wav_path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
try:
engine.save_to_file(text, wav_path)
engine.runAndWait()
with open(wav_path, "rb") as f:
return f.read()
finally:
try:
os.remove(wav_path)
except OSError:
pass
async def play_wav_on_raspi(wav_bytes: bytes) -> None:
async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client:
r = await client.post(
RASPI_PLAY_WAV_URL,
content=wav_bytes,
headers={"Content-Type": "audio/wav"},
)
r.raise_for_status()
speaking_evt = asyncio.Event()
# 今日の日付はアプリが答える
def is_date_question(t: str) -> bool:
t = t.strip()
keys = [
"今日は何日", "今日何日", "今日は何月", "今日の日付", "何月何日", "今日って何日",
"今日何月", "本日何日", "本日の日付", "きょうは何日"
]
return any(k in t for k in keys)
def today_jp() -> str:
now = datetime.now()
return f"今日は{now.year}年{now.month}月{now.day}日です。"
# -------------------------
# ガベージ判定(既存)
# -------------------------
def looks_like_garbage(text: str) -> bool:
t = text.strip()
if not t:
return True
# 1) 数字や記号の連打(3.5.5.5... など)
if re.fullmatch(r"[0-9\.\-,:;\/\s]+", t) and len(t) >= 8:
return True
# 2) 同じ短いパターンの繰り返し
if len(t) >= 24:
for key in ["このように", "ご視聴ありがとうございました", "ジェス", "ありがとう"]:
if t.count(key) >= 4:
return True
if len(set(t)) <= 5:
return True
# 3) 典型の誤出力(音楽タグ)
if t in {"【音楽】", "[音楽]", "音楽"}:
return True
return False
# -------------------------
# WAV保存(デバッグ用:既存)
# -------------------------
SAVE_WAV_DIR = r"./debug_wavs"
SAVE_WAV_ENABLED = False
SAVE_WAV_MAX_PER_RUN = 200
def save_pcm16_to_wav(pcm_i16_bytes: bytes, sample_rate: int, wav_dir: str, prefix: str, index: int) -> str:
os.makedirs(wav_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
path = os.path.join(wav_dir, f"{prefix}_{ts}_{index:04d}.wav")
with wave.open(path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(pcm_i16_bytes)
return path
# =========================
# サーバ設定
# =========================
HOST = "0.0.0.0"
PORT = 8765
# =========================
# Whisper
# =========================
#MODEL_SIZE = "small"
MODEL_SIZE = "medium"
DEVICE = "cpu"
COMPUTE_TYPE = "int8"
LANGUAGE = "ja"
model = WhisperModel(MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE)
# 音声仕様(受信は48k int16 mono 20ms)
IN_SR = 48000
OUT_SR = 16000
SAMPLE_WIDTH = 2
CHANNELS = 1
FRAME_MS = 20
IN_SAMPLES = IN_SR * FRAME_MS // 1000
IN_BYTES = IN_SAMPLES * SAMPLE_WIDTH
OUT_SAMPLES = OUT_SR * FRAME_MS // 1000
OUT_BYTES = OUT_SAMPLES * SAMPLE_WIDTH
# VAD設定
VAD_MODE = 3
SILENCE_END_MS = 800
MIN_UTTER_MS = 350
MAX_UTTER_SEC = 10
NO_SPEECH_THRESHOLD = 0.90
LOG_PROB_THRESHOLD = -0.60
# キュー(遅延蓄積防止:最新のみ)
UTTER_QUEUE_MAX = 1
TEXT_QUEUE_MAX = 1
# =========================
# Ollama / TTS 設定
# =========================
OLLAMA_URL = "http://127.0.0.1:11434"
OLLAMA_MODEL = "gemma3:4b" # ← api/tags の name を正確に
SYSTEM_PROMPT = (
"あなたは日本語で正確に答えるローカル音声対話ロボットです。"
"行政区分や住所については、推測せず、"
"不確かな場合は『おそらくですが、』と言ったあとに、回答を加えてください。"
"無駄な前置きや箇条書きは避けてください。"
"返答は日本語で、短く自然な口調で1〜2文。"
"直近の発話を重視してください"
"古い話題は自然に忘れて構いません"
)
HALLUCINATION_PHRASES = {
"ご視聴ありがとうございました",
"ご視聴ありがとうございました!",
"ん",
}
async def ollama_chat(messages: list[dict]) -> str:
"""
Ollama: /api/chat (stream=false)
messages: [{"role":"system/user/assistant", "content":"..."}...]
"""
payload = {
"model": OLLAMA_MODEL,
"messages": messages,
"stream": False,
}
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(f"{OLLAMA_URL}/api/chat", json=payload)
r.raise_for_status()
data = r.json()
return (data.get("message", {}).get("content") or "").strip()
def speak_windows_sapi_blocking(text: str) -> None:
"""
Windows標準 SAPI で喋らせる
"""
safe = text.replace('"', '""')
cmd = [
"powershell",
"-NoProfile",
"-Command",
(
"Add-Type -AssemblyName System.Speech; "
"$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer; "
f'$speak.Speak("{safe}");'
)
]
subprocess.run(cmd, check=False)
# =========================
# 型
# =========================
@dataclass
class AudioMeta:
sample_rate: int
channels: int
dtype: str
frame_ms: int
def resample_48k_to_16k_i16(pcm48_i16_bytes: bytes, state=None):
converted, new_state = audioop.ratecv(
pcm48_i16_bytes, SAMPLE_WIDTH, CHANNELS, IN_SR, OUT_SR, state
)
return converted, new_state
def transcribe_sync(audio16_f32: np.ndarray) -> str:
segments, info = model.transcribe(
audio16_f32,
language=LANGUAGE,
vad_filter=False,
beam_size=2,
best_of=1,
temperature=0.0,
condition_on_previous_text=False,
no_speech_threshold=NO_SPEECH_THRESHOLD,
log_prob_threshold=LOG_PROB_THRESHOLD,
)
return "".join(seg.text for seg in segments).strip()
class WebRTCVADSegmenter:
def __init__(self):
self.vad = webrtcvad.Vad(VAD_MODE)
self.silence_frames_needed = max(1, SILENCE_END_MS // FRAME_MS)
self.min_frames = max(1, MIN_UTTER_MS // FRAME_MS)
self.max_frames = max(1, int(MAX_UTTER_SEC * 1000 // FRAME_MS))
# プレロール(頭切れ防止)
self.pre_roll_frames = max(1, 200 // FRAME_MS)
self.pre_roll: Deque[bytes] = deque(maxlen=self.pre_roll_frames)
self.state = "idle"
self.current: Deque[bytes] = deque()
self.silence_count = 0
self.total_frames = 0
self.speech_frames = 0
self.last_reason = "silence" # or "max"
def push_frame_16k(self, pcm16_i16_bytes: bytes) -> Optional[bytes]:
is_speech = self.vad.is_speech(pcm16_i16_bytes, OUT_SR)
self.total_frames += 1
if is_speech:
self.speech_frames += 1
if self.state == "idle":
self.pre_roll.append(pcm16_i16_bytes)
if is_speech:
self.state = "speaking"
self.current.clear()
self.silence_count = 0
for f in self.pre_roll:
self.current.append(f)
self.current.append(pcm16_i16_bytes)
return None
# speaking
self.current.append(pcm16_i16_bytes)
if len(self.current) >= self.max_frames:
out = b"".join(self.current)
self.last_reason = "max"
self._reset()
return out
if not is_speech:
self.silence_count += 1
if self.silence_count >= self.silence_frames_needed:
out = b"".join(self.current) if len(self.current) >= self.min_frames else None
self.last_reason = "silence"
self._reset()
return out
else:
self.silence_count = 0
return None
def _reset(self):
self.state = "idle"
self.current.clear()
self.silence_count = 0
# utter単位の統計を次のutterに持ち越さない
self.total_frames = 0
self.speech_frames = 0
# =========================
# ワーカー分離
# =========================
async def stt_worker(utter_queue: asyncio.Queue[bytes], text_queue: asyncio.Queue[str], stop_evt: asyncio.Event):
"""
utter_queue: 音声まとまり(16k int16)
text_queue : STT結果(最新のみ)
"""
saved_count = 0
while not stop_evt.is_set():
try:
utter16 = await asyncio.wait_for(utter_queue.get(), timeout=0.2)
except asyncio.TimeoutError:
continue
# --- 最新優先:utter_queue に溜まった分は捨てて「最後の1件」だけ処理する ---
# ※ここで呼ぶ task_done() は「直前に取り出していた user_text を捨てた扱い」にするため。
# 最後に処理する user_text は、処理が終わった場所(finally)で task_done() する。
try:
while True:
newer = utter_queue.get_nowait()
utter_queue.task_done() # 捨てるぶんは完了扱い
utter16 = newer # 最後に取れたものを処理対象にする
except asyncio.QueueEmpty:
pass
t0 = time.perf_counter()
# デバッグWAV保存
if SAVE_WAV_ENABLED and saved_count < SAVE_WAV_MAX_PER_RUN:
saved_count += 1
wav_path = save_pcm16_to_wav(
pcm_i16_bytes=utter16,
sample_rate=OUT_SR,
wav_dir=SAVE_WAV_DIR,
prefix="utter",
index=saved_count
)
print(f"💾 saved wav: {wav_path}")
pcm16 = np.frombuffer(utter16, dtype=np.int16)
audio16_f32 = pcm16.astype(np.float32) / 32768.0
rms_f32 = float(np.sqrt(np.mean(audio16_f32 * audio16_f32))) if audio16_f32.size else 0.0
if rms_f32 < 0.008:
utter_queue.task_done()
continue
dur_sec = len(audio16_f32) / OUT_SR
#短すぎる発話を捨てる <1.0秒)
if dur_sec < 1.0:
utter_queue.task_done()
continue
print("…STT running…")
text = await asyncio.to_thread(transcribe_sync, audio16_f32)
print("…STT done…")
# 定型ハルシネーション除去
if text in HALLUCINATION_PHRASES:
utter_queue.task_done()
print(f"ハルシネーション除去:{text}")
continue
t1 = time.perf_counter()
if text and not looks_like_garbage(text):
print(f"📝 {text} (dur={dur_sec:.2f}s stt={t1-t0:.2f}s)")
# ★LLM/TTSはここでやらない:別ワーカーへ
if text_queue.full():
try:
_ = text_queue.get_nowait()
text_queue.task_done()
except asyncio.QueueEmpty:
pass
await text_queue.put(text)
utter_queue.task_done()
# =========================
# Wake / Sleep 制御
# =========================
conversation_active = False
WAKE_PHRASES = [
"ハイロボ", "はいロボ", "はい、ロボ", "会話開始",
"おはようございます。", "こんにちは", "こんばんは",
]
SLEEP_PHRASES = [
"会話終了", "終了", "ストップ", "おやすみ","ありがとう"
]
def normalize_text(t: str) -> str:
# STT揺れ対策(空白除去)
return t.replace(" ", "").replace(" ", "").strip()
async def reply_worker(text_queue: asyncio.Queue[str], stop_evt: asyncio.Event):
"""
text_queue から受け取ったテキストに対して
Ollama(履歴付き) → TTS を実行する
"""
global conversation_active
# --- 会話履歴(messages) ---
# 直近 10 メッセージ = 5往復ぶん保持(記憶優先)
#MAX_HISTORY_MSG = 10
# 直近 4 メッセージ = 2往復ぶん保持(速度優先)
MAX_HISTORY_MSG = 4
history: list[dict] = []
# system は毎回先頭に入れる(履歴には含めない)
system_msg = {"role": "system", "content": SYSTEM_PROMPT}
while not stop_evt.is_set():
try:
user_text = await asyncio.wait_for(text_queue.get(), timeout=0.2)
except asyncio.TimeoutError:
continue
norm = normalize_text(user_text)
# =========================
# Wake 判定(非アクティブ時)
# =========================
if not conversation_active:
if any(p in norm for p in WAKE_PHRASES):
conversation_active = True
reply = "はい、どうしましたか。"
print("🔔 Wake:", reply)
wav_bytes = await asyncio.to_thread(
sapi_text_to_wav_bytes, reply
)
speaking_evt.set()
try:
await play_wav_on_raspi(wav_bytes)
finally:
speaking_evt.clear()
# 呼び出し語以外は完全無視
text_queue.task_done()
continue
# =========================
# Sleep 判定(アクティブ時)
# =========================
if any(p in norm for p in SLEEP_PHRASES):
conversation_active = False
reply = "了解しました。待機します。"
print("💤 Sleep:", reply)
wav_bytes = await asyncio.to_thread(
sapi_text_to_wav_bytes, reply
)
speaking_evt.set()
try:
await play_wav_on_raspi(wav_bytes)
finally:
speaking_evt.clear()
text_queue.task_done()
continue
# --- 最新優先:text_queue に溜まった分は捨てて「最後の1件」だけ処理する ---
# ※ここで呼ぶ task_done() は「直前に取り出していた user_text を捨てた扱い」にするため。
# 最後に処理する user_text は、処理が終わった場所(finally)で task_done() する。
try:
while True:
newer = text_queue.get_nowait()
text_queue.task_done() # 捨てるぶんは完了扱い
user_text = newer
except asyncio.QueueEmpty:
pass
try:
# 日付系はアプリが確実に答える(オフラインでも正解)
if is_date_question(user_text):
reply = today_jp()
print(f"🗣️ {reply}")
# 最新優先:この瞬間に次の入力が来ていたら古い返答は喋らない
if not text_queue.empty():
continue
# PCでWAV生成→Raspiで再生(ローカル完結)
wav_bytes = await asyncio.to_thread(sapi_text_to_wav_bytes, reply)
speaking_evt.set()
try:
await play_wav_on_raspi(wav_bytes)
finally:
speaking_evt.clear()
# 履歴にも入れて“会話っぽさ”を維持(任意だが入れておく)
history.append({"role": "user", "content": user_text})
history.append({"role": "assistant", "content": reply})
if len(history) > MAX_HISTORY_MSG:
history = history[-MAX_HISTORY_MSG:]
continue
# 通常の会話:履歴をつけて Ollama に投げる
messages = [system_msg] + history + [{"role": "user", "content": user_text}]
print("🤖 THINKING...")
reply = await ollama_chat(messages)
if reply:
print(f"🗣️ {reply}")
# 最新優先:この瞬間に次の入力が来ていたら古い返答は喋らない
if not text_queue.empty():
continue
wav_bytes = await asyncio.to_thread(sapi_text_to_wav_bytes, reply)
speaking_evt.set()
try:
await play_wav_on_raspi(wav_bytes)
finally:
speaking_evt.clear()
# ③ 履歴更新(user→assistant のペアを追加)
history.append({"role": "user", "content": user_text})
history.append({"role": "assistant", "content": reply})
# ④ 履歴が長くなったら古いものを捨てる
if len(history) > MAX_HISTORY_MSG:
history = history[-MAX_HISTORY_MSG:]
except Exception as e:
print(f"⚠️ reply error: {type(e).__name__}: {e}")
finally:
text_queue.task_done()
# =========================
# WebSocket Handler
# =========================
async def handler(ws, utter_queue: asyncio.Queue[bytes]):
meta_raw = await ws.recv()
meta_json = json.loads(meta_raw)
if meta_json.get("type") != "audio_meta":
await ws.close()
return
meta = AudioMeta(
sample_rate=int(meta_json["sample_rate"]),
channels=int(meta_json["channels"]),
dtype=str(meta_json["dtype"]),
frame_ms=int(meta_json["frame_ms"]),
)
print("✅ client connected:", meta_json)
if not (meta.sample_rate == 48000 and meta.channels == 1 and meta.dtype == "int16" and meta.frame_ms == FRAME_MS):
print("⚠️ 推奨フォーマットと異なります:", meta_json)
vad = WebRTCVADSegmenter()
rs_state = None
try:
async for msg in ws:
if isinstance(msg, str):
continue
# 48k 20ms -> 16k 20ms
pcm16, rs_state = resample_48k_to_16k_i16(msg, rs_state)
# ★しゃべっている間はマイク入力を無視(回り込み防止)
if speaking_evt.is_set():
continue
# webrtcvadはフレーム長に厳密
if len(pcm16) != OUT_BYTES:
continue
utter = vad.push_frame_16k(pcm16)
if utter is not None:
# ---- Crosstalk / 雑談ゲート(軽量) ----
# 強制カット(max)かつ speech比率が高すぎる = ずっと誰か喋ってる
# → 複数人会話・環境雑談になりやすいので捨てる
if vad.last_reason == "max":
ratio = (vad.speech_frames / max(1, vad.total_frames))
if ratio >= 0.95: #雑談が多い環境なら0.90とかに下げる
# デバッグしたければprint
print(f"🚫 drop crosstalk-like utter: reason=max, speech_ratio={ratio:.2f}")
continue
# 遅延蓄積防止:最新のみ
if utter_queue.full():
try:
_ = utter_queue.get_nowait()
utter_queue.task_done()
except asyncio.QueueEmpty:
pass
await utter_queue.put(utter)
except websockets.ConnectionClosed:
print("❌ client disconnected")
# =========================
# main
# =========================
async def main():
stop_evt = asyncio.Event()
utter_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=UTTER_QUEUE_MAX)
text_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=TEXT_QUEUE_MAX)
stt_task = asyncio.create_task(stt_worker(utter_queue, text_queue, stop_evt))
reply_task = asyncio.create_task(reply_worker(text_queue, stop_evt))
print(f"Starting WS STT server ws://{HOST}:{PORT}")
server = await websockets.serve(
lambda ws: handler(ws, utter_queue),
HOST, PORT,
max_size=None,
ping_interval=20,
ping_timeout=20,
)
try:
await asyncio.Future()
except KeyboardInterrupt:
print("Stopping...")
finally:
stop_evt.set()
stt_task.cancel()
reply_task.cancel()
server.close()
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
音声受信再生(PC->Raspi):speak_serve.py
from fastapi import FastAPI, Request
import subprocess
app = FastAPI()
# Sound Blaster Play! 3
ALSA_DEVICE = "plughw:1,0"
#ALSA_DEVICE = "plughw:0,0"
@app.post("/play_wav")
async def play_wav(req: Request):
wav_bytes = await req.body()
if not wav_bytes:
return {"ok": True, "skipped": True}
p = subprocess.Popen(
["aplay", "-D", ALSA_DEVICE, "-"],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
p.stdin.write(wav_bytes)
p.stdin.close()
p.wait()
return {"ok": True, "bytes": len(wav_bytes)}
finally:
if p.poll() is None:
p.terminate()
