Code Structure | Mod9 ASR Engine

Overview of the Engine Library (mrpboost)

This directory contains library code for use in the Engine, internally named mrpboost.

Boost is used extensively in this library. There are some instances where functionality is available in both the boost namespace and the std namespace. In these cases, we generally prefer std except when functionality or compatibility depends on the boost version.

As usual with Kaldi library directories, the source files in this directory are compiled into a library (kaldi-mrpboost.a or libkaldi-mrpboost.so), which linked into executables that use the library. The code for the executables that use this library can be found under src/mrpboostbin.

One difference from a standard Kaldi library directory is that we use some additional Makefile rules stored in extra.mk. These rules ensure that executables that use the library produced by the code in this directory compile and link correctly. This mostly involves pointing to Boost libraries and to other third-party libraries that the executable or the library depend on (JSON parsing, sample rate conversion, TensorFlow, etc). These third-party libraries should all have been installed as as build dependencies (see MOD9_BUILD_ENGINE.md in the top-level directory).

Note that the .h header files contain API-level documentation, whereas the the .cc source files contain comments related to implementation.

Commands and Tasks

Many of the files in this directory implement requests that a client can send to the Engine server. This is done by subclassing Command, which can be found in command.{cc,h}. A Command is created when a client sends a request (a newline terminated JSON string).

A Command is known as "lightweight" if it runs with minimal resource requirements and should not count towards the resource limits specified when the Engine starts (e.g. --limit.threads). Otherwise, the Command is known as "heavy". A Task is a subclass of Command that is always "heavy" and should be used instead of Command if the request will use significant resources.

Streaming Tasks

Tasks that need to stream data to and from a client should be subclasses of TaskStreamBase, which is defined in task-stream-base.{cc,h}. This base class handles the audio data portion of a request that is streamed from the client, after the request's initial newline-delimited JSON options have already been read and processed in server.cc.

A subclass of TaskStreamBase should handle anything specific to the subclass, such as converting WAV formatted audio files, performing automatic speech recognition, etc.

Decode Tasks

There are several files that implement the client-requested recognize command. Originally, the recognize command was known as decode; this is reflected in the current naming for the files that implement the recognize command.

Separate processing workflows are implemented for online vs. batch operation. Depending on the options passed with the recognize command, different Task subclasses are invoked:

  • If batch-threads is not passed or is set to 0, and batch-intervals is not passed, then the command is handled by TaskDecode (defined in task-decode.{cc,h}). This online mode of operation is recommended for live streaming audio in real-time.
  • If batch-threads != 0 and batch-intervals is not set, then command.cc routes it as decode-parallel-vad, which is handled by TaskDecodeParallelVad (defined in task-decode-parallel-vad.{cc,h}). This uses Voice Activity Detection from libfvad.
  • If batch-intervals is set, then the command is rewritten in command.cc to be decode-parallel-intervals, which is handled by TaskDecodeParallelIntervals (defined in task-decode-parallel-intervals.{cc,h}). This is for client-provided VAD segmentation.

This diagram shows inheritance among classes that implement the recognize command.

                         Command
                            |
                          Task
                            |
                      TaskStreamBase
                            |
                      TaskDecodeBase
                      |            |
                 TaskDecode    TaskDecodeParallelBase
                                |                  |
                 TaskDecodeParallelVad    TaskDecodeParallelIntervals

Lifespan of a Request

This section describes what happens in the Engine in response to a request from a client. The RAII (Resource Allocation Is Initialization) pattern is used extensively to ensure resources (e.g. threads, memory) are freed when no longer needed. The Factory pattern is used to allow creating of a subclass of Command based on the JSON sent by the client.

Note that various Engine settings are defined in engine-settings.{cc,h} and are in the namespace Engine. For example, the integer Engine::read_line_limit is the maximum allowable line length for specifying the initial JSON request options.

When the Engine executable starts, a server object is created using Server::create() (defined in server.{cc,h}) that runs in a separate thread and begins listening for requests against a socket. (see Server::start_accept()). The main thread primarily listens for signals, such as SIGTERM. The server also spawns another thread to collect systems stats like CPU and memory usage. If NLP models are loaded, three other threads will be started by TensorFlow.

Consider a recognize request with batch-threads set to 4:

(echo '{"batch-threads": 4}'; curl -sL https://mod9.io/swbd.wav) | nc $HOST $PORT

Such a request goes through the following set of steps.

  1. The server object asynchronously (via Boost's asio) accepts the new connection request. Each request is associated with its own handler object (defined in handler.h) that manages the socket, Boost asio's IO context and guard objects, and a buffer object for storing incoming bytes.

  2. If the Engine is not shutting down, then the server first calls Server::start_accept() where it continues to listen for more requests.

  3. The server object asynchronously reads the initial JSON request string from the socket into the handler's buffer. If the handler does not receive the complete JSON request within 60s (see Engine::read_line_timeout_ms) or the JSON line is longer than 1MiB (Engine::read_line_limit) then the request is terminated. In all cases where a request is rejected or does not successfully execute, the server responds with a line reporting {"status": "failed"} before closing the connection.

  4. The server object then calls Command::from() (defined in command.{cc,h}) which determines which specific Command or Task is being requested. In the above example, a "command" request option is not specified which implies the default command of "recognize". However, if the server has not finished loading models, i.e. it is not ready to accept "heavy" requests, then the request is rejected since "recognize" is considered a "heavy" command. Since the batch-threads request option is non-zero, a TaskDecodeParallelVad object named cmd is instantiated (see Decode Tasks for more information). The configure() method is called on the cmd to validate the request options and whether the request can execute given the Engine's request and thread limits, if any. It also selects ASR, NLP, and G2P models to use, defaulting to the first of each model loaded if the client has not requested a specific model. Since the above example involves a WAV-formatted file, a flag is set to indicate that the WAV header must be processed. This step is internally referred to as the "preload" processing.

  5. Once cmd is validated, a thread named heavy_thread is spawned to run the task. Control proceeds to Server::handle_accept(), which spawns another thread named request to resume IO operations and clean up the connection once the request completes. These objects are of type boost::thread and not std::thread; an important distinction is that the former are detached in their destructors before going out of scope.

  6. The heavy_thread starts execution in TaskStreamBase::run(). First, a consumer thread is created to process the audio. This thread calls TaskDecodeBase::runTask() which notifies the client that the Engine is ready to accept audio, and awaits preload processing.

  7. The current heavy_thread asynchronously reads (see TaskStreamBase::on_read()) a fixed number of bytes (typically 128 bytes, see Engine::read_stream_buf_size) into the preload buffer. Note that TaskStreamBase has two separate buffers - a preload buffer to process a WAV-formatted file header and a main buffer to process audio bytes. TaskDecodeBase::processPreload() then attempts to process the header and extract information like number of audio bytes, number of bytes in a sample (block alignment), number of channels, etc. If there are insufficient number of bytes in the preload buffer to successfully process the header, more bytes are read (up to a max of 1MiB, see Engine::riff_header_limit) until the header can be processed successfully.

  8. The heavy_thread will then submit() sample-aligned bytes into an object of type StreamBuffer. This class handles synchronization of reads and writes into a queue. It is also used as means of communication between the heavy_thread and consumer thread by storing whether the request is aborted, end-of-stream is reached, or the consumer thread is done reading audio (applicable in case of batch-intervals).

  9. In parallel, once the WAV header preload processing finishes, the consumer thread determines the audio's sample rate and encoding and configures the sample rate conversion object. The Engine uses the libsamplerate (SRC) library to perform sample rate conversion; this is only needed if the sample rate of the audio does not match the rate used when training the selected ASR model. For internal processing, the audio encoding is always converted to 16-bit signed integer PCM.

  10. The consumer thread then calls TaskDecodeParallelVad::runTaskBase(). Here, another thread called write_thread is created to send results to the client. Then VAD-related settings are configured. The Engine uses the libfvad library for performing VAD. A thread pool is created with 4 threads as requested by the client in the above example request. This thread pool performs the actual ASR decoding of the prepared audio segments.

  11. Control then proceeds to a while loop that runs as long as there is audio to be processed. It reads data from the StreamBuffer queue as and when it has data. The next() method that does this read also performs sample rate conversion on the data, if needed. In the while loop, audio bytes are accumulated till a segment is created (seetask-decode-parallel-vad.cc for comments explaining how a VAD endpoint is determined). A promise object is created to get the result JSON and the future to this object is stored so that the write_thread can send results to the client in order. The audio segment is submitted to the thread pool to be decoded via TaskDecodeParallelBase::decodeAudio().

  12. This decodeAudio() function sets decoder options, creates a decoder object, extracts features from the segment's samples and performs decoding. A result JSON object is created with required response fields (e.g. .transcript, .status, etc.) and additional fields as may be requested (e.g. word-level timestamps, alternatives, etc.). The JSON-encoded string representation of this nlohmann::json result object is stored in the 'future' for the write_thread to process.

  13. If at any point in the asynchronous read process if there is a delay of more than 10s (see Engine::read_stream_timeout_ms) then the request is aborted and a failure status is sent to the client. Otherwise, once heavy_thread finishes reading the audio stream, i.e. eos() is reached, it sets a corresponding flag in the StreamBuffer object and waits for the consumer thread to finish its processing.

  14. The consumer thread finishes once all the audio data is decoded and replies are sent to the client by the write_thread. Now that the request is completed, the request thread mentioned above closes the connection and performs garbage collection if the Engine was compiled with certain memory management libraries (e.g. libtcmalloc).

This concludes the processing of the request. Note that every time a new thread is spawned to handle a new request, Boost logging thread-scope attributes are set so that the log statements are printed with the correct metadata for that request. The request is also associated with both a random UUID as well as an incrementing request number, to facilitate tracing and debugging.

Integration with Kaldi Libraries

The majority of the changes from core Kaldi are in the directories src/mrp, src/mrpbin, src/mrpboost, and src/mrpboostbin. The changes to core Kaldi are primarily in wave file handling (src/feat/wave-reader.{cc,h}) and encryption/compression (src/util/kaldi-io.cc and files matching src/util/mrp-*).

If you want to extend the Engine with additional Kaldi (or OpenFST) functionality (e.g. lattice rescoring), the primary files you will have to modify will likely be src/mrpboost/asr.{cc,h} and src/mrpboost/task-decode-base.{cc,h}. The class ASRConfig (defined in asr.{cc,h}) handles Kaldi-style argument handling. For example, if a new feature requires a model file, you should add the command line handling code to ASRConfig, the model file itself to the model directory, and the command line text (e.g. --rescore-lm=graph/rescore.4gram.lm) to conf/model.conf in the model directory.

The class ASRInfo (defined in asr.{cc,h}) loads and stores information provided by ASRConfig. To continue the example of language model rescoring, you should add a variable of type ConstArpaLm to store the language model file, and add code to the ASRInfo constructor to load the file that was specified in ASRConfig.

The majority of the code related to recognition is in the files matching src/mrpboost/task-decode-*.{cc,h}. The post-decoding processing is concentrated in the method prepare_json_response() in src/mrpboost/task-decode-base.cc. This method takes a decoder object containing a completed recognition (e.g. it has reached either an endpoint or end of file) and returns a JSON object to be returned to the client based on the options provided by the client and passed to prepare_json_response(). For the example of language model rescoring, you would first make sure the variable need_lattice is set correctly (since language model rescoring is typically performed on a lattice), and then add the code that actually rescores the lattice shortly after the call to the Kaldi decoder method decoder.GetLattice().

Suggested Enhancements

Some potential enhancements that the Mod9 team has not yet implemented:

  • (#722) Rather than abstracting various ASR decoder settings under the speed request option, taking a value between 1 and 9, it would be nice for advanced clients to specify the precise beams that they desire. This should be subject to operator limits (#721) which should be reported by the get-info command (#723).
  • (#699) If the partial and endpoint request options are both false, then it doesn't make sense for a client to specify a small value for the latency request option; in this case, it should be set as high as possible.
  • (#673) It should be possible for add-words to modify a copy of a request-specific graph, or perhaps a named client-specific graph, rather than modifying a graph that is shared with all other clients across their requests.
  • (#661) The Engine doesn't implement write timeouts as extensively as it does for read timeouts.
  • (#531) The code could be improved by always using json explicitly instead of auto.
  • (#600) Integrate ffmpeg libraries to support non-WAV audio formats and encodings.
  • (#680) Allow a custom grammar to be saved and loaded similarly to ASR models.
  • (#690) Allow loading an empty graph to support recognition from just a custom grammar.
  • (#659) Support ASR models that do not have position-dependent phones.
  • (#639) Use fewer threads for each request, e.g. cooperative multi-tasking or callback-based pattern.
  • (#555) Enable audio-uri request option for downloading remote audio files.
  • (#468) Download models from remote URIs at runtime.
  • (#679) Add support for language identification.
  • (#348) Allow a client-specified transcript to be inserted into a decoded lattice as the 1-best path.
  • (#492) Handle SIGINT and SIGQUIT (not just SIGTERM).
  • (#605) If --models.mutable is disabled, load a constfst without conversion to vectorfst.
  • (#565) The get-models-info command should list loadable models if --models.mutable is enabled.
  • (#719) The get-models-info command should accept {asr,g2p,nlp}-model as request options.
  • (#590) Expose a --models.load-all option to load all available models during Engine startup.
  • (#508) Use VAD for non-batch streaming (i.e. task-decode.{cc,h}) to reduce CPU usage during silence.
  • (#301) The way default values are set in task-decode-base.h is kinda ugly.
  • (#163) Allow multiple acoustic models to be loaded, and perform score-level fusion.
  • (#677) Accelerate sample rate conversion (see https://github.com/libsndfile/libsamplerate/issues/176).
  • (#613) Log and/or report usage metrics, to facilitate usage-based licensing.
  • (#572) Allow the ~ character in model paths (see https://stackoverflow.com/questions/33224941).
  • (#724) NLP processing should use a limited-size context for long input strings.
  • (#373) Indicate the stability of partial results.
  • (#597) Specify a --host=127.0.0.1 flag to only listen for local connections.
  • (#48) Enable Unix domain sockets for most efficient local usage.

Known Issues

There are several known issues that the Mod9 team has not been able to address:

  • (#533) For strict compliance with C++17, the use of deprecated std::random_shuffle must be removed from the core Kaldi libraries in the upstream project (which is still using C++14). While allowed under gcc9, it prevents the Engine from building with clang12.
  • (#710) The content-length request option is ignored for raw-formatted audio.
  • (#675) The handling of cgroups v2 is somewhat limited, so determination of memory limits may be incorrect for more complicated setups that require parsing hierarchical cgroups.
  • (#627) There is no logging when requests are rejected due to the Engine's inability to accept new requests, e.g. if no threads are available or the Engine is shutting down.
  • (#696) It is possible for the WAV format to specify additional sub-chunks after the data sub-chunk; the Engine will consider these to be unexpected bytes and log a warning.
  • (#718) Non-recognize commands should error if unexpected options are requested.
  • (#427) Requesting batch-intervals may result in harmless warnings due to rounding.
  • (#359) The Engine might set a .warning message multiple times within a single request (especially at the start of processing). Each of these assignments will overwrite the previous warning and so the user will only see the last one.
  • (#254) Requests that attempt to modify a decoding graph may be relatively unlikely to acquire a unique lock on graph's mutex when many other decoding threads are actively holding a non-exclusive lock on that same shared mutex.
  • (#676) Batch-mode processing with VAD won't work for ASR models with sample rates other the 8kHz, 16kHz, 32kHz, or 48kHz. This is because libwebrtcvad only supports those specific sample rates. The solution is to use libsamplerate to resample accordingly. However, it is very unusual to train ASR models at samples rates other than 8kHz or 16kHz.
  • (#221) Submitting a 44-byte WAV file (all header, no data) is considered an error. Some ASR vendors reference this test file, which should be recognized as zero-duration silence.
  • (#581) It is possible to submit a WAV-formatted file to the Engine without any request options specified as a preceding line of JSON (i.e. {"command":"recognize"} is implied). This is a violation of the Engine's published application protocol, but the functionality is intentional and convenient. Consider it an undocumented "Easter egg" :-)