Data-Dive

Near real-time transcription of a live audio stream with OpenAI Whisper for keyword monitoring

· mc51

In this post, I demonstrate how to transcribe a live audio-stream in near real time using OpenAI Whisper in Python. We do this to monitor the stream for specific keywords. Also, the transcribed text is logged with timestamps for further use. Using fuzzy matching on the transcribed text, we find mentions of our keywords. Then, we trigger a message via Signal messenger to a group or person that contains the relevant part of the spoken passage.

Background

This was a quick POC built on a weekend: I wanted to monitor a local radio station for the mention of some keywords in order to win a competition. This needed to be done quickly, which resulted in a simple solution. Also, it had to be as resource efficient as possible to minimize infrastructure costs. While it was not built with stability as the main focus, it actually performed flawlessly for several weeks without any downtimes. Hence, the goal was achieved!
All the code is available in this repo. In the following, I will go over the overarching structure of the solution and explain some of the relevant parts of the code.

Overview

The solution consists of three parts:

  1. save_stream.py continuously saves .mp3 files in chunks of 30sec from a live audio stream
  2. transcribe.py permanently transcribes each audio chunk using OpenAI Whisper. Then, it uses fuzzy matching to monitor the spoken word for our keywords. On match, it calls msg_group_via_signal.sh
  3. msg_group_via_signal.sh relays the alarm message to the signal-cli tool which messages a group on the Signal messenger

We use OpenAI’s Whisper as it is currently one of the best performing models for audio transcription. Moreover, it’s easily available and comes in different model sizes. Using the small model, we achieve decent results even on non-english audio. In addition, it’s resource efficient enough to be run on a CPU without falling behind the stream. I had good results deploying this on a c5a.large EC2 machine on AWS, costing ~65$ per month. To account for the non perfect transcription quality, we use fuzzy search when looking for our keywords in the transcription. Thereby, we reduce false-negatives (but increase false-positive) alarms. With better specs / a GPU you can increase model size for better quality transcriptions.

Details

Let’s take a look at some of the code. We start with save_stream.py. It mainly consists of this function:

def record_stream_to_file(stream: requests.Response):
    """Record stream audio to files as .mp3 in chunks during recording times
    Args:
        stream (requests.Response): Audio stream
    """
    start_utc = datetime.utcnow()
    start_local = datetime.now(tz=LOCAL_TZ)
    current_local_time = start_local.time()
    log.info(
        "Current tz time: %s. Stream from: %s Stream until: %s",
        current_local_time,
        STREAM_TIME_FROM,
        STREAM_TIME_TO,
    )
    if not STREAM_TIME_FROM < current_local_time < STREAM_TIME_TO:
        log.warning("Not during recording time")
        sys.exit(0)
    filename = DATA_PATH + "/stream_" + start_utc.isoformat(timespec="seconds") + ".mp3"
    log.info("Writing stream to: %s", filename)
    with open(filename, "wb") as file:
        try:
            for block in stream.iter_content(1024):
                file.write(block)
                if datetime.utcnow() - start_utc > timedelta(
                    seconds=CHUNK_TIME_SECONDS
                ):
                    file.close()
                    record_stream_to_file(stream)
        except KeyboardInterrupt:
            log.info("Received keyboard interrupt")
            sys.exit(0)

It’s pretty much straight forward: In the beginning, we check if the current local time lies within a pre-defined period. This is the period that we want to monitor. Outside of this period, we just quit the script. For deploying, I created a simple bash script that checks if the python script is already running and starts it otherwise. Then, I used cron to start this bash script every minute. Thus, making sure that our save_stream.py only records when wanted. Simple enough! Who needs Airflow anyway?
In the next step, we iterate over the request.Reponse object (the opened stream) chunk-wise. We write each chuck to a file until we hit a time limit (for me it was 30s), then we recursively call the function again.
As a result, we will have a constant stream of .mp3 files of 30s length stored in a local folder and ready for transcription.

Next, we focus on the transcribe.py script. This is the heart of our solution. Similar to the first script, this also only keeps running within our monitoring period of the day otherwise it just stops.
The first step is to get all recent .mp3 files from the upstream script:

def get_recent_files() -> list:
    """Return file paths for recently created files
    Returns:
        list: File paths
    """
    log.info("Listing recent files")
    now = datetime.utcnow()
    audio_files = []
    for file in sorted(Path(PATH_AUDIO_FILES).iterdir()):
        if ".mp3" in file.name:
            file_ts = datetime.fromtimestamp(file.stat().st_ctime)
            if now - file_ts <= timedelta(minutes=RECENT_FILES_TIME_MIN):
                audio_files.append(file)
    log.debug("Recent files: %s", audio_files)
    return audio_files

We sort the files in the audio folder by their timestamp and check if they are within our recency limit. If so, they are added to a list and will be processed. This is a measure to make our code more robust: If the transcription script fails and exists at any point it will be restarted by cron. We want it to go on transcribing, but only for the most recent minutes of the stream. Otherwise, it would fall behind and not be real-time anymore.
The transcription functionality itself is super simple:

def transcribe_file(model, options, file_path: str) -> str:
    """Transcribe the .mp3 file to text
    Args:
        model: Whisper Model
        file_path (str): File path
    Returns:
        str: Transcribed text
    """
    audio = whisper.load_audio(file_path)
    audio = whisper.pad_or_trim(audio)
    mel = whisper.log_mel_spectrogram(audio).to(model.device)
    result = whisper.decode(model, mel, options)
    return result.text  # type: ignore

The Whisper library takes care of all the heavy lifting for us. Consequently, the loading, pre-processing and decoding of our audio files requires four lines of code. That’s a good moment to appreciate the simplicity of Python and the hard work that open-source developers put in regularly! If you want to learn more about the signal processing needed to create the model inputs, I recommend the following read. And the official blog post introducing whisper makes for another fascinating read.
Now, that we have transformed speech to text, we can monitor the text for our keywords:

def search_for_text(text: str):
    """Search for search term in text and send alarm if found"
    Args:
        text (str): Text to search
    """
    log.info("Searching in text")
    text = text.lower()

    for term in SEARCH_TERMS_LIVE:
        results = find_near_matches(term, text, max_l_dist=2)
        if results:
            log.debug("Search results: %s", results)
            log.info("Found live term: %s", term)
            send_alarm_to_signal(text, live=True)

    for term in SEARCH_TERMS_DEV:
        results = find_near_matches(term, text, max_l_dist=1)
        if results:
            log.debug("Search results: %s", results)
            log.info("Found dev term: %s", term)
            send_alarm_to_signal(text, live=False)

We use the fuzzysearch package for comparing the transcribed text to our keywords. This allows us to match our keywords, even if they have not been perfectly transcribed. Thereby, decreasing the chance for false-negatives. The simple and fast Levenshtein distance algorithm is applied here.
Finally, on a keyword match, we call the send_alarm_to_signal function:

def send_alarm_to_signal(text: str, live=False):
    """Send alarm via signal bash script
    Args:
        text (str): Text with match
        live (bool, optional): Live or test. Defaults to False.
    """

    message = "This is a test. I've picked up the following: \n"
    if live:
        message = "This is a LIVE. I've picked up the following:\n"
    message = message + text
    subprocess.Popen([PATH_TO_SIGNAL_SCRIPT, message])

This simply calls a bash script and relies the part of the transcribed text with a match to it. The bash script, again, is very simple:


#!/bin/bash
echo "Sending message:"
echo "$1"
echo ""
echo "$1" | /usr/local/bin/signal-cli send --message-from-stdin -g "MyGroupID"

It calls the signal-cli tool, echoing the relied message via stdin. When correctly setup (follow the Quickstart), you can use the tool to act as a Signal Messenger bot. In this case, we use it to message a group, alarming it about the keyword match in the audio stream!