Whisperのlarge-v3モデルでリアルタイム文字起こしをしてみる

2024年5月27日

はじめに

whisperに「large-v3」モデルが公開されていることに今更ながら気が付いたので、リアルタイム文字起こしをしてみたいと思います。
出来る限りリアルタイムにしたいので、実際に使用するのは「faster-Whisper」です。
なお、音声周りのプログラムは書いたことがないので、適切でない処理や記載があるかもしれません。ご了承ください。

whisperとfaster-whisperについて

whisper

Whisperは、OpenAIが開発した音声認識モデルです。多言語対応で、音声認識だけでなく、音声翻訳や言語識別も行うことができます。大規模かつ多様なデータセットで訓練されており、高精度の音声認識を実現しています​。

faster-whisper

faster-whisperは、Whisperモデルをより高速かつ効率的に動作させるために最適化されたバージョンです。リアルタイム音声認識の性能向上を目指しており、遅延を減らしつつ高精度の認識を提供します。

PC環境

  • OS: Windows 10
  • CPU: Intel Core i9-12900K
  • RAM: 32GB
  • GPU:GeForce RTX4090
  • Python: 3.10

必要なライブラリのインストール

プログラムを動作させるために、以下のライブラリをインストールします。

pip install numpy
pip install watchdog
pip install sounddevice
pip install scipy
pip install faster_whisper

作成したい機能

1.喋っている途中で文字起こしをする
2.喋っている途中だと文章が不完全な可能性があるので、喋り終わったら最新情報で更新する
3.できる限り早いレスポンス

処理のイメージ

こんなイメージです。

プログラムの説明

githubにも同じものを公開しています。

プログラム全体

録音と文字起こしを別スレッドで処理するようにしています。
録音側はただwavファイルを出力するだけ、文字起こし側はwavファイルの文字起こしと削除を行っています。

import os
import time
import numpy as np
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from threading import Thread
import sounddevice as sd
from scipy.io.wavfile import write
from faster_whisper import WhisperModel

HALLUCINATION_TEXTS = [
    "ご視聴ありがとうございました", "ご視聴ありがとうございました。",
    "ありがとうございました", "ありがとうございました。",
    "どうもありがとうございました", "どうもありがとうございました。",
    "どうも、ありがとうございました", "どうも、ありがとうございました。",
    "おやすみなさい", "おやすみなさい。",
    "Thanks for watching!",
    "終わり", "おわり",
    "お疲れ様でした", "お疲れ様でした。",
]

# モデルのロード
MODEL_SIZE = "large-v3"
model = WhisperModel(MODEL_SIZE, device="cuda", compute_type="float16")

def record_audio(audio_directory, fs=16000, silence_threshold=0.3, min_duration=0.1, amplitude_threshold=0.01, out_duration=0.5):
    # wavファイルの出力先を作成
    audio_directory.mkdir(parents=True, exist_ok=True)

    # 録音処理
    while True:
        file_name = f"recorded_audio_{int(time.time())}"
        recorded_audio = []
        silent_time = 0
        speak_time = 0
        speak_cnt = 1

        try:
            with sd.InputStream(samplerate=fs, channels=1) as stream:
                # 最初に無音状態が終わるまでは録音せずに待機
                while True:
                    data, overflowed = stream.read(int(fs * min_duration))
                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    if np.any(np.abs(data) >= amplitude_threshold):
                        recorded_audio.append(data)
                        break

                # 録音を開始してから無音状態になるまでループ
                while True:
                    data, overflowed = stream.read(int(fs * min_duration))
                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    recorded_audio.append(data)
                    if np.all(np.abs(data) < amplitude_threshold):
                        silent_time += min_duration
                        if silent_time >= silence_threshold:
                            break
                    else:
                        # 一定時間経過で話し途中でもwavファイル作成
                        speak_time += min_duration
                        if speak_time >= out_duration:
                            file_path = audio_directory / f"{file_name}_{speak_cnt}.wav"
                            speak_time = 0
                            speak_cnt += 1
                            audio_data = np.concatenate(recorded_audio, axis=0)
                            audio_data = np.int16(audio_data * 32767)
                            write(file_path, fs, audio_data)
                        silent_time = 0
        except Exception as e:
            print(f"Error in record_audio: {e}")
            continue

        # 無音検知によるwavファイル作成
        file_path = audio_directory / f"{file_name}_latest.wav"
        audio_data = np.concatenate(recorded_audio, axis=0)
        audio_data = np.int16(audio_data * 32767)
        write(file_path, fs, audio_data)

class FileHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return

        file_name, file_ext = os.path.splitext(event.src_path)
        if not file_name.endswith("_latest"):
            base_name = file_name.rsplit('_', 1)[0]
            suffix = int(file_name.rsplit('_', 1)[-1])
            if os.path.exists(os.path.join(os.path.dirname(event.src_path), f"{base_name}_latest.wav")):
                # 最終ファイルがあるので処理不要、ファイル削除してスキップ
                os.remove(event.src_path)
                return
            if os.path.exists(os.path.join(os.path.dirname(event.src_path), f"{base_name}_{suffix + 1}.wav")):
                # 次ファイルがあるので処理不要、ファイル削除してスキップ
                os.remove(event.src_path)
                return

        # 文字起こしして、ファイルを削除
        self.process_file(event.src_path)
        os.remove(event.src_path)

    def process_file(self, file_path):
        # 文字起こし
        transcription = self.transcribe(file_path)

        # ハルシネーションで出力された可能性のある場合は処理しない
        if transcription in HALLUCINATION_TEXTS:
            return

        if transcription:
            if "latest" in str(file_path):
                # 最終ファイルの場合、そのまま出力
                print(transcription)
            else:
                # 喋っている途中の文字起こしは《》で囲う
                print("《"+transcription+"》")

    def transcribe(self, file_path):
        try:
            with open(file_path, 'rb') as audio_file:
                segments, _ = model.transcribe(audio_file, language="ja", beam_size=5, patience=0.5)
                transcription = ''.join(segment.text for segment in segments)
            return transcription
        except Exception as e:
            print(f"Error in transcribe: {e}")
            return ""

def start_monitoring(watch_path):
    # 録音処理(スレッドを立てる)
    # wavファイルを生成し続ける処理
    record_thread = Thread(target=record_audio, args=(watch_path, 16000, 0.3, 0.1, 0.01, 0.3))
    record_thread.daemon = True
    record_thread.start()

    # フォルダを監視してwavファイルが生成された場合
    # 文字起こし処理を行う
    event_handler = FileHandler()
    observer = Observer()
    observer.schedule(event_handler, watch_path, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

if __name__ == "__main__":
    start_monitoring(Path.cwd() / "tmp")

録音処理(record_audio)

無音状態のままだと録音をせずに待機して、音を検知して録音を開始します。その後、再度無音になるまで録音を行います。無音判定の閾値は引数で調整可能です。

録音中も一定期間ごとに、wavファイルを出力しています。喋り終わらないと文字起こしが実行されないのはリアルタイム感がないなぁと思ったので、喋り途中でも問答無用でwavファイルを出力する仕様にしました。

def record_audio(audio_directory, fs=16000, silence_threshold=0.3, min_duration=0.1, amplitude_threshold=0.01, out_duration=0.5):
    # wavファイルの出力先を作成
    audio_directory.mkdir(parents=True, exist_ok=True)

    # 録音処理
    while True:
        file_name = f"recorded_audio_{int(time.time())}"
        recorded_audio = []
        silent_time = 0
        speak_time = 0
        speak_cnt = 1

        try:
            with sd.InputStream(samplerate=fs, channels=1) as stream:
                # 最初に無音状態が終わるまでは録音せずに待機
                while True:
                    data, overflowed = stream.read(int(fs * min_duration))
                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    if np.any(np.abs(data) >= amplitude_threshold):
                        recorded_audio.append(data)
                        break

                # 録音を開始してから無音状態になるまでループ
                while True:
                    data, overflowed = stream.read(int(fs * min_duration))
                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    recorded_audio.append(data)
                    if np.all(np.abs(data) < amplitude_threshold):
                        silent_time += min_duration
                        if silent_time >= silence_threshold:
                            break
                    else:
                        # 一定時間経過で話し途中でもwavファイル作成
                        speak_time += min_duration
                        if speak_time >= out_duration:
                            file_path = audio_directory / f"{file_name}_{speak_cnt}.wav"
                            speak_time = 0
                            speak_cnt += 1
                            audio_data = np.concatenate(recorded_audio, axis=0)
                            audio_data = np.int16(audio_data * 32767)
                            write(file_path, fs, audio_data)
                        silent_time = 0
        except Exception as e:
            print(f"Error in record_audio: {e}")
            continue

        # 無音検知によるwavファイル作成
        file_path = audio_directory / f"{file_name}_latest.wav"
        audio_data = np.concatenate(recorded_audio, axis=0)
        audio_data = np.int16(audio_data * 32767)
        write(file_path, fs, audio_data)

文字起こし処理(フォルダ監視とtranscribe)

watchdogライブラリを使用してフォルダを監視しています。
監視対象のフォルダにファイルが作成されたことを検知して、文字起こし処理を実行します。

喋り途中ファイルの場合は、現段階より先のwavファイルがない場合のみ処理を行います。

class FileHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return

        file_name, file_ext = os.path.splitext(event.src_path)
        if not file_name.endswith("_latest"):
            base_name = file_name.rsplit('_', 1)[0]
            suffix = int(file_name.rsplit('_', 1)[-1])
            if os.path.exists(os.path.join(os.path.dirname(event.src_path), f"{base_name}_latest.wav")):
                # 最終ファイルがあるので処理不要、ファイル削除してスキップ
                os.remove(event.src_path)
                return
            if os.path.exists(os.path.join(os.path.dirname(event.src_path), f"{base_name}_{suffix + 1}.wav")):
                # 次ファイルがあるので処理不要、ファイル削除してスキップ
                os.remove(event.src_path)
                return

        # 文字起こしして、ファイルを削除
        self.process_file(event.src_path)
        os.remove(event.src_path)

    def process_file(self, file_path):
        # 文字起こし
        transcription = self.transcribe(file_path)

        # ハルシネーションで出力された可能性のある場合は処理しない
        if transcription in HALLUCINATION_TEXTS:
            return

        if transcription:
            if "latest" in str(file_path):
                # 最終ファイルの場合、そのまま出力
                print(transcription)
            else:
                # 喋っている途中の文字起こしは《》で囲う
                print("《"+transcription+"》")

    def transcribe(self, file_path):
        try:
            with open(file_path, 'rb') as audio_file:
                segments, _ = model.transcribe(audio_file, language="ja", beam_size=5, patience=0.5)
                transcription = ''.join(segment.text for segment in segments)
            return transcription
        except Exception as e:
            print(f"Error in transcribe: {e}")
            return ""

実行結果

「whisperを使った文字起こしです」と発声してみました。
《》がついているものが途中経過です。喋り切っていないwavファイルを使用しているので、最初は全く違う言葉ですが徐々に正確になっていますね。

実際に使用する際には最終結果のみでいいかもですが、生放送とかで使用する場合は途中経過もあった方がよりリアル感が増しますね。


改善点

私の滑舌やイントネーションが悪いのか、認識しづらい文字はとことん認識しづらい印象でした。
ノイズ除去等の前処理を全く行っていないので、その辺を実装すると改善されるかも?しれません。

気が向いたらやってみようと思います。