From f13cf61b05432a989e6a42c95b843a56639bcbde Mon Sep 17 00:00:00 2001 From: Fangjun Kuang Date: Tue, 6 Dec 2022 16:34:27 +0800 Subject: [PATCH] Convert conv-emformer to ncnn (#717) * Export conv-emformer via torch.jit.trace() --- ...former-transducer-stateless2-2022-12-05.sh | 79 + ...-lstm-transducer-stateless2-2022-09-03.sh} | 0 ...ormer-transducer-stateless2-2022-12-05.yml | 77 + ...-lstm-transducer-stateless2-2022-09-03.yml | 2 +- .../emformer2.py | 1798 +++++++++++++++++ .../export-for-ncnn.py | 335 +++ .../jit_pretrained.py | 292 +++ .../lstmp.py | 1 + .../scaling_converter.py | 1 + .../streaming-ncnn-decode.py | 387 ++++ .../train2.py | 1128 +++++++++++ 11 files changed, 4099 insertions(+), 1 deletion(-) create mode 100755 .github/scripts/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.sh rename .github/scripts/{run-librispeech-lstm-transducer-stateless2-2022-09-03.yml => run-librispeech-lstm-transducer-stateless2-2022-09-03.sh} (100%) create mode 100644 .github/workflows/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.yml create mode 100644 egs/librispeech/ASR/conv_emformer_transducer_stateless2/emformer2.py create mode 100755 egs/librispeech/ASR/conv_emformer_transducer_stateless2/export-for-ncnn.py create mode 100755 egs/librispeech/ASR/conv_emformer_transducer_stateless2/jit_pretrained.py create mode 120000 egs/librispeech/ASR/conv_emformer_transducer_stateless2/lstmp.py create mode 120000 egs/librispeech/ASR/conv_emformer_transducer_stateless2/scaling_converter.py create mode 100755 egs/librispeech/ASR/conv_emformer_transducer_stateless2/streaming-ncnn-decode.py create mode 100755 egs/librispeech/ASR/conv_emformer_transducer_stateless2/train2.py diff --git a/.github/scripts/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.sh b/.github/scripts/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.sh new file mode 100755 index 000000000..32c939206 --- /dev/null +++ b/.github/scripts/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash +# +set -e + +log() { + # This function is from espnet + local fname=${BASH_SOURCE[1]##*/} + echo -e "$(date '+%Y-%m-%d %H:%M:%S') (${fname}:${BASH_LINENO[0]}:${FUNCNAME[1]}) $*" +} + +cd egs/librispeech/ASR + +repo_url=https://huggingface.co/Zengwei/icefall-asr-librispeech-conv-emformer-transducer-stateless2-2022-07-05 + +log "Downloading pre-trained model from $repo_url" +GIT_LFS_SKIP_SMUDGE=1 git clone $repo_url +repo=$(basename $repo_url) +pushd $repo +git lfs pull --include "exp/pretrained-epoch-30-avg-10-averaged.pt" +git lfs pull --include "data/lang_bpe_500/bpe.model" +cd exp +ln -s pretrained-epoch-30-avg-10-averaged.pt epoch-99.pt +popd + +log "Display test files" +tree $repo/ +soxi $repo/test_wavs/*.wav +ls -lh $repo/test_wavs/*.wav + +log "Install ncnn and pnnx" + +# We are using a modified ncnn here. Will try to merge it to the official repo +# of ncnn +git clone https://github.com/csukuangfj/ncnn +pushd ncnn +git submodule init +git submodule update python/pybind11 +python3 setup.py bdist_wheel +ls -lh dist/ +pip install dist/*.whl +cd tools/pnnx +mkdir build +cd build +cmake -D Python3_EXECUTABLE=/opt/hostedtoolcache/Python/3.8.14/x64/bin/python3 .. +make -j4 pnnx + +./src/pnnx || echo "pass" + +popd + +log "Test exporting to pnnx format" + +./conv_emformer_transducer_stateless2/export-for-ncnn.py \ + --exp-dir $repo/exp \ + --bpe-model $repo/data/lang_bpe_500/bpe.model \ + --epoch 99 \ + --avg 1 \ + --use-averaged-model 0 \ + \ + --num-encoder-layers 12 \ + --chunk-length 32 \ + --cnn-module-kernel 31 \ + --left-context-length 32 \ + --right-context-length 8 \ + --memory-size 32 + +./ncnn/tools/pnnx/build/src/pnnx $repo/exp/encoder_jit_trace-pnnx.pt +./ncnn/tools/pnnx/build/src/pnnx $repo/exp/decoder_jit_trace-pnnx.pt +./ncnn/tools/pnnx/build/src/pnnx $repo/exp/joiner_jit_trace-pnnx.pt + +./conv_emformer_transducer_stateless2/streaming-ncnn-decode.py \ + --tokens $repo/data/lang_bpe_500/tokens.txt \ + --encoder-param-filename $repo/exp/encoder_jit_trace-pnnx.ncnn.param \ + --encoder-bin-filename $repo/exp/encoder_jit_trace-pnnx.ncnn.bin \ + --decoder-param-filename $repo/exp/decoder_jit_trace-pnnx.ncnn.param \ + --decoder-bin-filename $repo/exp/decoder_jit_trace-pnnx.ncnn.bin \ + --joiner-param-filename $repo/exp/joiner_jit_trace-pnnx.ncnn.param \ + --joiner-bin-filename $repo/exp/joiner_jit_trace-pnnx.ncnn.bin \ + $repo/test_wavs/1089-134686-0001.wav diff --git a/.github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml b/.github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.sh similarity index 100% rename from .github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml rename to .github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.sh diff --git a/.github/workflows/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.yml b/.github/workflows/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.yml new file mode 100644 index 000000000..b9a1582c4 --- /dev/null +++ b/.github/workflows/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.yml @@ -0,0 +1,77 @@ +name: run-librispeech-conv-emformer-transducer-stateless2-2022-12-05 + +on: + push: + branches: + - master + pull_request: + types: [labeled] + + schedule: + # minute (0-59) + # hour (0-23) + # day of the month (1-31) + # month (1-12) + # day of the week (0-6) + # nightly build at 15:50 UTC time every day + - cron: "50 15 * * *" + +jobs: + run_librispeech_conv_emformer_transducer_stateless2_2022_12_05: + if: github.event.label.name == 'ready' || github.event.label.name == 'ncnn' || github.event_name == 'push' || github.event_name == 'schedule' + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest] + python-version: [3.8] + + fail-fast: false + + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' + cache-dependency-path: '**/requirements-ci.txt' + + - name: Install Python dependencies + run: | + grep -v '^#' ./requirements-ci.txt | grep -v kaldifst | xargs -n 1 -L 1 pip install + pip uninstall -y protobuf + pip install --no-binary protobuf protobuf + + - name: Cache kaldifeat + id: my-cache + uses: actions/cache@v2 + with: + path: | + ~/tmp/kaldifeat + key: cache-tmp-${{ matrix.python-version }}-2022-09-25 + + - name: Install kaldifeat + if: steps.my-cache.outputs.cache-hit != 'true' + shell: bash + run: | + .github/scripts/install-kaldifeat.sh + + - name: Inference with pre-trained model + shell: bash + env: + GITHUB_EVENT_NAME: ${{ github.event_name }} + GITHUB_EVENT_LABEL_NAME: ${{ github.event.label.name }} + run: | + mkdir -p egs/librispeech/ASR/data + ln -sfv ~/tmp/fbank-libri egs/librispeech/ASR/data/fbank + ls -lh egs/librispeech/ASR/data/* + + sudo apt-get -qq install git-lfs tree sox + export PYTHONPATH=$PWD:$PYTHONPATH + export PYTHONPATH=~/tmp/kaldifeat/kaldifeat/python:$PYTHONPATH + export PYTHONPATH=~/tmp/kaldifeat/build/lib:$PYTHONPATH + + .github/scripts/run-librispeech-conv-emformer-transducer-stateless2-2022-12-05.sh diff --git a/.github/workflows/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml b/.github/workflows/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml index 59f116fde..f5ee09e16 100644 --- a/.github/workflows/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml +++ b/.github/workflows/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml @@ -111,7 +111,7 @@ jobs: export PYTHONPATH=~/tmp/kaldifeat/kaldifeat/python:$PYTHONPATH export PYTHONPATH=~/tmp/kaldifeat/build/lib:$PYTHONPATH - .github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.yml + .github/scripts/run-librispeech-lstm-transducer-stateless2-2022-09-03.sh - name: Display decoding results for lstm_transducer_stateless2 if: github.event_name == 'schedule' diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/emformer2.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/emformer2.py new file mode 100644 index 000000000..65a7efa77 --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/emformer2.py @@ -0,0 +1,1798 @@ +# Copyright 2022 Xiaomi Corporation (Author: Zengwei Yao) +# +# See ../../../../LICENSE for clarification regarding multiple authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# It is modified based on +# 1) https://github.com/pytorch/audio/blob/main/torchaudio/models/emformer.py # noqa +# 2) https://github.com/pytorch/audio/blob/main/torchaudio/prototype/models/conv_emformer.py # noqa + +import math +from typing import List, Optional, Tuple + +import torch +import torch.nn as nn +from encoder_interface import EncoderInterface +from scaling import ( + ActivationBalancer, + BasicNorm, + DoubleSwish, + ScaledConv1d, + ScaledConv2d, + ScaledLinear, +) + +from icefall.utils import make_pad_mask + +LOG_EPSILON = math.log(1e-10) + + +def unstack_states( + states: Tuple[List[List[torch.Tensor]], List[torch.Tensor]] +) -> List[Tuple[List[List[torch.Tensor]], List[torch.Tensor]]]: + """Unstack the emformer state corresponding to a batch of utterances + into a list of states, where the i-th entry is the state from the i-th + utterance in the batch. + + Args: + states: + A tuple of 2 elements. + ``states[0]`` is the attention caches of a batch of utterance. + ``states[1]`` is the convolution caches of a batch of utterance. + ``len(states[0])`` and ``len(states[1])`` both eqaul to number of layers. # noqa + + Returns: + A list of states. + ``states[i]`` is a tuple of 2 elements of i-th utterance. + ``states[i][0]`` is the attention caches of i-th utterance. + ``states[i][1]`` is the convolution caches of i-th utterance. + ``len(states[i][0])`` and ``len(states[i][1])`` both eqaul to number of layers. # noqa + """ + + attn_caches, conv_caches = states + batch_size = conv_caches[0].size(0) + num_layers = len(attn_caches) + + list_attn_caches = [None] * batch_size + for i in range(batch_size): + list_attn_caches[i] = [[] for _ in range(num_layers)] + for li, layer in enumerate(attn_caches): + for s in layer: + s_list = s.unbind(dim=1) + for bi, b in enumerate(list_attn_caches): + b[li].append(s_list[bi]) + + list_conv_caches = [None] * batch_size + for i in range(batch_size): + list_conv_caches[i] = [None] * num_layers + for li, layer in enumerate(conv_caches): + c_list = layer.unbind(dim=0) + for bi, b in enumerate(list_conv_caches): + b[li] = c_list[bi] + + ans = [None] * batch_size + for i in range(batch_size): + ans[i] = [list_attn_caches[i], list_conv_caches[i]] + + return ans + + +def stack_states( + state_list: List[Tuple[List[List[torch.Tensor]], List[torch.Tensor]]] +) -> Tuple[List[List[torch.Tensor]], List[torch.Tensor]]: + """Stack list of emformer states that correspond to separate utterances + into a single emformer state so that it can be used as an input for + emformer when those utterances are formed into a batch. + + Note: + It is the inverse of :func:`unstack_states`. + + Args: + state_list: + Each element in state_list corresponding to the internal state + of the emformer model for a single utterance. + ``states[i]`` is a tuple of 2 elements of i-th utterance. + ``states[i][0]`` is the attention caches of i-th utterance. + ``states[i][1]`` is the convolution caches of i-th utterance. + ``len(states[i][0])`` and ``len(states[i][1])`` both eqaul to number of layers. # noqa + + Returns: + A new state corresponding to a batch of utterances. + See the input argument of :func:`unstack_states` for the meaning + of the returned tensor. + """ + batch_size = len(state_list) + + attn_caches = [] + for layer in state_list[0][0]: + if batch_size > 1: + # Note: We will stack attn_caches[layer][s][] later to get attn_caches[layer][s] # noqa + attn_caches.append([[s] for s in layer]) + else: + attn_caches.append([s.unsqueeze(1) for s in layer]) + for b, states in enumerate(state_list[1:], 1): + for li, layer in enumerate(states[0]): + for si, s in enumerate(layer): + attn_caches[li][si].append(s) + if b == batch_size - 1: + attn_caches[li][si] = torch.stack(attn_caches[li][si], dim=1) + + conv_caches = [] + for layer in state_list[0][1]: + if batch_size > 1: + # Note: We will stack conv_caches[layer][] later to get conv_caches[layer] # noqa + conv_caches.append([layer]) + else: + conv_caches.append(layer.unsqueeze(0)) + for b, states in enumerate(state_list[1:], 1): + for li, layer in enumerate(states[1]): + conv_caches[li].append(layer) + if b == batch_size - 1: + conv_caches[li] = torch.stack(conv_caches[li], dim=0) + + return [attn_caches, conv_caches] + + +class ConvolutionModule(nn.Module): + """ConvolutionModule. + + Modified from https://github.com/pytorch/audio/blob/main/torchaudio/prototype/models/conv_emformer.py # noqa + + Args: + chunk_length (int): + Length of each chunk. + right_context_length (int): + Length of right context. + channels (int): + The number of input channels and output channels of conv layers. + kernel_size (int): + Kernerl size of conv layers. + bias (bool): + Whether to use bias in conv layers (default=True). + """ + + def __init__( + self, + chunk_length: int, + right_context_length: int, + channels: int, + kernel_size: int, + bias: bool = True, + ) -> None: + """Construct an ConvolutionModule object.""" + super().__init__() + # kernerl_size should be an odd number for 'SAME' padding + assert (kernel_size - 1) % 2 == 0, kernel_size + + self.chunk_length = chunk_length + self.right_context_length = right_context_length + self.channels = channels + + self.pointwise_conv1 = ScaledConv1d( + channels, + 2 * channels, + kernel_size=1, + stride=1, + padding=0, + bias=bias, + ) + # After pointwise_conv1 we put x through a gated linear unit + # (nn.functional.glu). + # For most layers the normal rms value of channels of x seems to be in + # the range 1 to 4, but sometimes, for some reason, for layer 0 the rms + # ends up being very large, between 50 and 100 for different channels. + # This will cause very peaky and sparse derivatives for the sigmoid + # gating function, which will tend to make the loss function not learn + # effectively. (for most layers the average absolute values are in the + # range 0.5..9.0, and the average p(x>0), i.e. positive proportion, + # at the output of pointwise_conv1.output is around 0.35 to 0.45 for + # different layers, which likely breaks down as 0.5 for the "linear" + # half and 0.2 to 0.3 for the part that goes into the sigmoid. + # The idea is that if we constrain the rms values to a reasonable range + # via a constraint of max_abs=10.0, it will be in a better position to + # start learning something, i.e. to latch onto the correct range. + self.deriv_balancer1 = ActivationBalancer( + channel_dim=1, max_abs=10.0, min_positive=0.05, max_positive=1.0 + ) + + # make it causal by padding cached (kernel_size - 1) frames on the left + self.cache_size = kernel_size - 1 + self.depthwise_conv = ScaledConv1d( + channels, + channels, + kernel_size, + stride=1, + padding=0, + groups=channels, + bias=bias, + ) + + self.deriv_balancer2 = ActivationBalancer( + channel_dim=1, min_positive=0.05, max_positive=1.0 + ) + + self.activation = DoubleSwish() + + self.pointwise_conv2 = ScaledConv1d( + channels, + channels, + kernel_size=1, + stride=1, + padding=0, + bias=bias, + initial_scale=0.25, + ) + + def _split_right_context( + self, + pad_utterance: torch.Tensor, + right_context: torch.Tensor, + ) -> torch.Tensor: + """ + Args: + pad_utterance: + Its shape is (cache_size + U, B, D). + right_context: + Its shape is (R, B, D). + + Returns: + Right context segments padding with corresponding context. + Its shape is (num_segs * B, D, cache_size + right_context_length). + """ + U_, B, D = pad_utterance.size() + R = right_context.size(0) + assert self.right_context_length != 0 + assert R % self.right_context_length == 0 + num_chunks = R // self.right_context_length + right_context = right_context.reshape( + num_chunks, self.right_context_length, B, D + ) + right_context = right_context.permute(0, 2, 1, 3).reshape( + num_chunks * B, self.right_context_length, D + ) + + intervals = torch.arange( + 0, self.chunk_length * (num_chunks - 1), self.chunk_length + ) + first = torch.arange(self.chunk_length, self.chunk_length + self.cache_size) + indexes = intervals.unsqueeze(1) + first.unsqueeze(0) + indexes = torch.cat( + [indexes, torch.arange(U_ - self.cache_size, U_).unsqueeze(0)] + ) + padding = pad_utterance[indexes] # (num_chunks, cache_size, B, D) + padding = padding.permute(0, 2, 1, 3).reshape( + num_chunks * B, self.cache_size, D + ) + + pad_right_context = torch.cat([padding, right_context], dim=1) + # (num_chunks * B, cache_size + right_context_length, D) + return pad_right_context.permute(0, 2, 1) + + def _merge_right_context(self, right_context: torch.Tensor, B: int) -> torch.Tensor: + """ + Args: + right_context: + Right context segments. + It shape is (num_segs * B, D, right_context_length). + B: + Batch size. + + Returns: + A tensor of shape (B, D, R), where + R = num_segs * right_context_length. + """ + right_context = right_context.reshape( + -1, B, self.channels, self.right_context_length + ) + right_context = right_context.permute(1, 2, 0, 3) + right_context = right_context.reshape(B, self.channels, -1) + return right_context + + def forward( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Causal convolution module. + + Args: + utterance (torch.Tensor): + Utterance tensor of shape (U, B, D). + right_context (torch.Tensor): + Right context tensor of shape (R, B, D). + + Returns: + A tuple of 2 tensors: + - output utterance of shape (U, B, D). + - output right_context of shape (R, B, D). + """ + U, B, D = utterance.size() + R, _, _ = right_context.size() + + # point-wise conv and GLU mechanism + x = torch.cat([right_context, utterance], dim=0) # (R + U, B, D) + x = x.permute(1, 2, 0) # (B, D, R + U) + x = self.pointwise_conv1(x) # (B, 2 * D, R + U) + x = self.deriv_balancer1(x) + x = nn.functional.glu(x, dim=1) # (B, D, R + U) + utterance = x[:, :, R:] # (B, D, U) + right_context = x[:, :, :R] # (B, D, R) + + # make causal convolution + cache = torch.zeros(B, D, self.cache_size, device=x.device, dtype=x.dtype) + pad_utterance = torch.cat([cache, utterance], dim=2) # (B, D, cache + U) + + # depth-wise conv on utterance + utterance = self.depthwise_conv(pad_utterance) # (B, D, U) + + if self.right_context_length > 0: + # depth-wise conv on right_context + pad_right_context = self._split_right_context( + pad_utterance.permute(2, 0, 1), right_context.permute(2, 0, 1) + ) # (num_segs * B, D, cache_size + right_context_length) + right_context = self.depthwise_conv( + pad_right_context + ) # (num_segs * B, D, right_context_length) + right_context = self._merge_right_context(right_context, B) # (B, D, R) + + x = torch.cat([right_context, utterance], dim=2) # (B, D, R + U) + x = self.deriv_balancer2(x) + x = self.activation(x) + + # point-wise conv + x = self.pointwise_conv2(x) # (B, D, R + U) + + right_context = x[:, :, :R] # (B, D, R) + utterance = x[:, :, R:] # (B, D, U) + return ( + utterance.permute(2, 0, 1), + right_context.permute(2, 0, 1), + ) + + @torch.jit.export + def infer( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + cache: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Causal convolution module applied on both utterance and right_context. + + Args: + utterance (torch.Tensor): + Utterance tensor of shape (U, B, D). + right_context (torch.Tensor): + Right context tensor of shape (R, B, D). + cache (torch.Tensor, optional): + Cached tensor for left padding of shape (B, D, cache_size). + + Returns: + A tuple of 3 tensors: + - output utterance of shape (U, B, D). + - output right_context of shape (R, B, D). + - updated cache tensor of shape (B, D, cache_size). + """ + # U, B, D = utterance.size() + # R, _, _ = right_context.size() + U = self.chunk_length + B = 1 + D = self.channels + R = self.right_context_length + + # point-wise conv + x = torch.cat([utterance, right_context], dim=0) # (U + R, B, D) + x = x.permute(1, 2, 0) # (B, D, U + R) + x = self.pointwise_conv1(x) # (B, 2 * D, U + R) + x = self.deriv_balancer1(x) + x = nn.functional.glu(x, dim=1) # (B, D, U + R) + + # make causal convolution + assert cache.shape == (B, D, self.cache_size), cache.shape + x = torch.cat([cache, x], dim=2) # (B, D, cache_size + U + R) + # update cache + new_cache = x[:, :, -R - self.cache_size : -R] + + # 1-D depth-wise conv + x = self.depthwise_conv(x) # (B, D, U + R) + + x = self.deriv_balancer2(x) + x = self.activation(x) + + # point-wise conv + x = self.pointwise_conv2(x) # (B, D, U + R) + + utterance = x[:, :, :U] # (B, D, U) + right_context = x[:, :, U:] # (B, D, R) + return ( + utterance.permute(2, 0, 1), + right_context.permute(2, 0, 1), + new_cache, + ) + + +class EmformerAttention(nn.Module): + r"""Emformer layer attention module. + + Args: + embed_dim (int): + Embedding dimension. + nhead (int): + Number of attention heads in each Emformer layer. + dropout (float, optional): + Dropout probability. (Default: 0.0) + tanh_on_mem (bool, optional): + If ``True``, applies tanh to memory elements. (Default: ``False``) + negative_inf (float, optional): + Value to use for negative infinity in attention weights. (Default: -1e8) + """ + + def __init__( + self, + embed_dim: int, + nhead: int, + left_context_length: int, + chunk_length: int, + right_context_length: int, + memory_size: int, + dropout: float = 0.0, + tanh_on_mem: bool = False, + negative_inf: float = -1e8, + ): + super().__init__() + + if embed_dim % nhead != 0: + raise ValueError( + f"embed_dim ({embed_dim}) is not a multiple of nhead ({nhead})." + ) + + self.embed_dim = embed_dim + self.nhead = nhead + self.tanh_on_mem = tanh_on_mem + self.negative_inf = negative_inf + self.head_dim = embed_dim // nhead + self.dropout = dropout + + self.left_context_length = left_context_length + self.right_context_length = right_context_length + self.chunk_length = chunk_length + self.memory_size = memory_size + + self.emb_to_key_value = ScaledLinear(embed_dim, 2 * embed_dim, bias=True) + self.emb_to_query = ScaledLinear(embed_dim, embed_dim, bias=True) + self.out_proj = ScaledLinear( + embed_dim, embed_dim, bias=True, initial_scale=0.25 + ) + + def _gen_attention_probs( + self, + attention_weights: torch.Tensor, + attention_mask: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + """Given the entire attention weights, mask out unecessary connections + and optionally with padding positions, to obtain underlying chunk-wise + attention probabilities. + + B: batch size; + Q: length of query; + KV: length of key and value. + + Args: + attention_weights (torch.Tensor): + Attention weights computed on the entire concatenated tensor + with shape (B * nhead, Q, KV). + attention_mask (torch.Tensor): + Mask tensor where chunk-wise connections are filled with `False`, + and other unnecessary connections are filled with `True`, + with shape (Q, KV). + padding_mask (torch.Tensor, optional): + Mask tensor where the padding positions are fill with `True`, + and other positions are filled with `False`, with shapa `(B, KV)`. + + Returns: + A tensor of shape (B * nhead, Q, KV). + """ + attention_weights_float = attention_weights.float() + attention_weights_float = attention_weights_float.masked_fill( + attention_mask.unsqueeze(0), self.negative_inf + ) + if padding_mask is not None: + Q = attention_weights.size(1) + B = attention_weights.size(0) // self.nhead + attention_weights_float = attention_weights_float.view(B, self.nhead, Q, -1) + attention_weights_float = attention_weights_float.masked_fill( + padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool), + self.negative_inf, + ) + attention_weights_float = attention_weights_float.view( + B * self.nhead, Q, -1 + ) + + attention_probs = nn.functional.softmax( + attention_weights_float, dim=-1 + ).type_as(attention_weights) + + attention_probs = nn.functional.dropout( + attention_probs, p=self.dropout, training=self.training + ) + return attention_probs + + def _forward_impl( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + memory: torch.Tensor, + attention_mask: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + left_context_key: Optional[torch.Tensor] = None, + left_context_val: Optional[torch.Tensor] = None, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Underlying chunk-wise attention implementation.""" + # U, B, _ = utterance.size() + # R = right_context.size(0) + # M = memory.size(0) + + U = self.chunk_length + B = 1 + R = self.right_context_length + M = self.memory_size + L = self.left_context_length + + scaling = float(self.head_dim) ** -0.5 + + # compute query with [right_context, utterance]. + query = self.emb_to_query(torch.cat([right_context, utterance])) + # compute key and value with [memory, right_context, utterance]. + key, value = self.emb_to_key_value( + torch.cat([memory, right_context, utterance]) + ).chunk(chunks=2, dim=2) + + if left_context_key is not None and left_context_val is not None: + # now compute key and value with + # [memory, right context, left context, uttrance] + # this is used in inference mode + key = torch.cat([key[: M + R], left_context_key, key[M + R :]]) + value = torch.cat([value[: M + R], left_context_val, value[M + R :]]) + + # Q = query.size(0) + Q = U + R + + # KV = key.size(0) + + reshaped_query = query.view(Q, self.nhead, self.head_dim).permute(1, 0, 2) + reshaped_key = key.view(M + R + U + L, self.nhead, self.head_dim).permute( + 1, 0, 2 + ) + reshaped_value = value.view(M + R + U + L, self.nhead, self.head_dim).permute( + 1, 0, 2 + ) + + # reshaped_query, reshaped_key, reshaped_value = [ + # tensor.contiguous().view(-1, B * self.nhead, self.head_dim).transpose(0, 1) + # for tensor in [query, key, value] + # ] # (B * nhead, Q or KV, head_dim) + attention_weights = torch.bmm( + reshaped_query * scaling, reshaped_key.permute(0, 2, 1) + ) # (B * nhead, Q, KV) + + # compute attention probabilities + if False: + attention_probs = self._gen_attention_probs( + attention_weights, attention_mask, padding_mask + ) + else: + attention_probs = nn.functional.softmax(attention_weights, dim=-1) + + # compute attention outputs + attention = torch.bmm(attention_probs, reshaped_value) + assert attention.shape == (B * self.nhead, Q, self.head_dim) + attention = attention.permute(1, 0, 2).reshape(-1, self.embed_dim) + # TODO(fangjun): ncnn does not support reshape(-1, 1, self.embed_dim) + # We have to change InnerProduct in ncnn to ignore the extra dim below + attention = attention.unsqueeze(1) + + # apply output projection + output_right_context_utterance = self.out_proj(attention) + # The return shape of output_right_context_utterance is (10, 1, 512) + + return output_right_context_utterance, key, value + + def forward( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + memory: torch.Tensor, + attention_mask: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + # TODO: Modify docs. + """Forward pass for training and validation mode. + + B: batch size; + D: embedding dimension; + R: length of the hard-copied right contexts; + U: length of full utterance; + M: length of memory vectors. + + It computes a `big` attention matrix on full utterance and + then utilizes a pre-computed mask to simulate chunk-wise attention. + + It concatenates three blocks: hard-copied right contexts, + and full utterance, as a `big` block, + to compute the query tensor: + query = [right_context, utterance], + with length Q = R + U. + It concatenates the three blocks: memory vectors, + hard-copied right contexts, and full utterance as another `big` block, + to compute the key and value tensors: + key & value = [memory, right_context, utterance], + with length KV = M + R + U. + Attention scores is computed with above `big` query and key. + + Then the underlying chunk-wise attention is obtained by applying + the attention mask. Suppose + c_i: chunk at index i; + r_i: right context that c_i can use; + l_i: left context that c_i can use; + m_i: past memory vectors from previous layer that c_i can use; + The target chunk-wise attention is: + c_i, r_i (in query) -> l_i, c_i, r_i, m_i (in key) + + Args: + utterance (torch.Tensor): + Full utterance frames, with shape (U, B, D). + right_context (torch.Tensor): + Hard-copied right context frames, with shape (R, B, D), + where R = num_chunks * right_context_length + memory (torch.Tensor): + Memory elements, with shape (M, B, D), where M = num_chunks - 1. + It is an empty tensor without using memory. + attention_mask (torch.Tensor): + Pre-computed attention mask to simulate underlying chunk-wise + attention, with shape (Q, KV). + padding_mask (torch.Tensor): + Padding mask of key tensor, with shape (B, KV). + + Returns: + Output of right context and utterance, with shape (R + U, B, D). + """ + output_right_context_utterance, _, _ = self._forward_impl( + utterance, + right_context, + memory, + attention_mask, + padding_mask=padding_mask, + ) + return output_right_context_utterance + + @torch.jit.export + def infer( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + memory: torch.Tensor, + left_context_key: torch.Tensor, + left_context_val: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Forward pass for inference. + + B: batch size; + D: embedding dimension; + R: length of right context; + U: length of utterance, i.e., current chunk; + L: length of cached left context; + M: length of cached memory vectors. + + It concatenates the right context and utterance (i.e., current chunk) + of current chunk, to compute the query tensor: + query = [right_context, utterance], + with length Q = R + U. + It concatenates the memory vectors, right context, left context, and + current chunk, to compute the key and value tensors: + key & value = [memory, right_context, left_context, utterance], + with length KV = M + R + L + U. + + The chunk-wise attention is: + chunk, right context (in query) -> + left context, chunk, right context, memory vectors (in key). + + Args: + utterance (torch.Tensor): + Current chunk frames, with shape (U, B, D), where U = chunk_length. + right_context (torch.Tensor): + Right context frames, with shape (R, B, D), + where R = right_context_length. + memory (torch.Tensor): + Memory vectors, with shape (M, B, D), or empty tensor. + left_context_key (torch,Tensor): + Cached attention key of left context from preceding computation, + with shape (L, B, D). + left_context_val (torch.Tensor): + Cached attention value of left context from preceding computation, + with shape (L, B, D). + padding_mask (torch.Tensor): + Padding mask of key tensor, with shape (B, KV). + + Returns: + A tuple containing 4 tensors: + - output of right context and utterance, with shape (R + U, B, D). + - attention key of left context and utterance, which would be cached + for next computation, with shape (L + U, B, D). + - attention value of left context and utterance, which would be + cached for next computation, with shape (L + U, B, D). + """ + # U = utterance.size(0) + # R = right_context.size(0) + # L = left_context_key.size(0) + # M = memory.size(0) + + U = self.chunk_length + R = self.right_context_length + L = self.left_context_length + M = self.memory_size + + # query = [right context, utterance] + Q = R + U + # key, value = [memory, right context, left context, utterance] + KV = M + R + L + U + attention_mask = torch.zeros(Q, KV).to( + dtype=torch.bool, device=utterance.device + ) + + output_right_context_utterance, key, value = self._forward_impl( + utterance, + right_context, + memory, + attention_mask, + padding_mask=padding_mask, + left_context_key=left_context_key, + left_context_val=left_context_val, + ) + return ( + output_right_context_utterance, + key[M + R :], + value[M + R :], + ) + + +class EmformerEncoderLayer(nn.Module): + """Emformer layer that constitutes Emformer. + + Args: + d_model (int): + Input dimension. + nhead (int): + Number of attention heads. + dim_feedforward (int): + Hidden layer dimension of feedforward network. + chunk_length (int): + Length of each input segment. + dropout (float, optional): + Dropout probability. (Default: 0.0) + layer_dropout (float, optional): + Layer dropout probability. (Default: 0.0) + cnn_module_kernel (int): + Kernel size of convolution module. + left_context_length (int, optional): + Length of left context. (Default: 0) + right_context_length (int, optional): + Length of right context. (Default: 0) + memory_size (int, optional): + Number of memory elements to use. (Default: 0) + tanh_on_mem (bool, optional): + If ``True``, applies tanh to memory elements. (Default: ``False``) + negative_inf (float, optional): + Value to use for negative infinity in attention weights. (Default: -1e8) + """ + + def __init__( + self, + d_model: int, + nhead: int, + dim_feedforward: int, + chunk_length: int, + dropout: float = 0.1, + layer_dropout: float = 0.075, + cnn_module_kernel: int = 31, + left_context_length: int = 0, + right_context_length: int = 0, + memory_size: int = 0, + tanh_on_mem: bool = False, + negative_inf: float = -1e8, + ): + super().__init__() + + self.attention = EmformerAttention( + embed_dim=d_model, + nhead=nhead, + left_context_length=left_context_length, + chunk_length=chunk_length, + memory_size=memory_size, + right_context_length=right_context_length, + dropout=dropout, + tanh_on_mem=tanh_on_mem, + negative_inf=negative_inf, + ) + self.summary_op = nn.AvgPool1d( + kernel_size=chunk_length, stride=chunk_length, ceil_mode=True + ) + + self.feed_forward_macaron = nn.Sequential( + ScaledLinear(d_model, dim_feedforward), + ActivationBalancer(channel_dim=-1), + DoubleSwish(), + nn.Dropout(dropout), + ScaledLinear(dim_feedforward, d_model, initial_scale=0.25), + ) + + self.feed_forward = nn.Sequential( + ScaledLinear(d_model, dim_feedforward), + ActivationBalancer(channel_dim=-1), + DoubleSwish(), + nn.Dropout(dropout), + ScaledLinear(dim_feedforward, d_model, initial_scale=0.25), + ) + + self.conv_module = ConvolutionModule( + chunk_length, + right_context_length, + d_model, + cnn_module_kernel, + ) + + self.norm_final = BasicNorm(d_model) + + # try to ensure the output is close to zero-mean + # (or at least, zero-median). + self.balancer = ActivationBalancer( + channel_dim=-1, min_positive=0.45, max_positive=0.55, max_abs=6.0 + ) + + self.dropout = nn.Dropout(dropout) + + self.layer_dropout = layer_dropout + self.left_context_length = left_context_length + self.right_context_length = right_context_length + self.chunk_length = chunk_length + self.memory_size = memory_size + self.d_model = d_model + self.use_memory = memory_size > 0 + + def _update_attn_cache( + self, + next_key: torch.Tensor, + next_val: torch.Tensor, + memory: torch.Tensor, + attn_cache: List[torch.Tensor], + ) -> List[torch.Tensor]: + """Update cached attention state: + 1) output memory of current chunk in the lower layer; + 2) attention key and value in current chunk's computation, which would + be reused in next chunk's computation. + """ + # attn_cache[0].shape (self.memory_size, 1, 512) + # memory.shape (1, 1, 512) + # attn_cache[1].shape (self.left_context_length, 1, 512) + # attn_cache[2].shape (self.left_context_length, 1, 512) + # next_key.shape (self.left_context_length + self.right_context_utterance, 1, 512) + # next_value.shape (self.left_context_length + self.right_context_utterance, 1, 512) + new_memory = torch.cat([attn_cache[0], memory]) + # TODO(fangjun): Remove torch.cat + # new_key = torch.cat([attn_cache[1], next_key]) + # new_val = torch.cat([attn_cache[2], next_val]) + attn_cache[0] = new_memory[1:] + attn_cache[1] = next_key[-self.left_context_length :] + attn_cache[2] = next_val[-self.left_context_length :] + return attn_cache + + def _apply_conv_module_forward( + self, + right_context_utterance: torch.Tensor, + R: int, + ) -> torch.Tensor: + """Apply convolution module in training and validation mode.""" + utterance = right_context_utterance[R:] + right_context = right_context_utterance[:R] + utterance, right_context = self.conv_module(utterance, right_context) + right_context_utterance = torch.cat([right_context, utterance]) + return right_context_utterance + + def _apply_conv_module_infer( + self, + right_context_utterance: torch.Tensor, + R: int, + conv_cache: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Apply convolution module on utterance in inference mode.""" + utterance = right_context_utterance[R:] + right_context = right_context_utterance[:R] + utterance, right_context, conv_cache = self.conv_module.infer( + utterance, right_context, conv_cache + ) + right_context_utterance = torch.cat([right_context, utterance]) + return right_context_utterance, conv_cache + + def _apply_attention_module_forward( + self, + right_context_utterance: torch.Tensor, + R: int, + attention_mask: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + """Apply attention module in training and validation mode.""" + utterance = right_context_utterance[R:] + right_context = right_context_utterance[:R] + + if self.use_memory: + memory = self.summary_op(utterance.permute(1, 2, 0)).permute(2, 0, 1)[ + :-1, :, : + ] + else: + memory = torch.empty(0).to(dtype=utterance.dtype, device=utterance.device) + output_right_context_utterance = self.attention( + utterance=utterance, + right_context=right_context, + memory=memory, + attention_mask=attention_mask, + padding_mask=padding_mask, + ) + + return output_right_context_utterance + + def _apply_attention_module_infer( + self, + right_context_utterance: torch.Tensor, + R: int, + attn_cache: List[torch.Tensor], + padding_mask: Optional[torch.Tensor] = None, + ) -> Tuple[torch.Tensor, List[torch.Tensor]]: + """Apply attention module in inference mode. + 1) Unpack cached states including: + - memory from previous chunks; + - attention key and value of left context from preceding + chunk's compuation; + 2) Apply attention computation; + 3) Update cached attention states including: + - memory of current chunk; + - attention key and value in current chunk's computation, which would + be resued in next chunk's computation. + """ + utterance = right_context_utterance[R:] + right_context = right_context_utterance[:R] + + pre_memory = attn_cache[0] + left_context_key = attn_cache[1] + left_context_val = attn_cache[2] + + if self.use_memory: + memory = torch.mean(utterance, dim=0, keepdim=True) + + # memory = self.summary_op(utterance.permute(1, 2, 0)).permute(2, 0, 1)[ + # :1, :, : + # ] + else: + memory = torch.empty(0).to(dtype=utterance.dtype, device=utterance.device) + (output_right_context_utterance, next_key, next_val) = self.attention.infer( + utterance=utterance, + right_context=right_context, + memory=pre_memory, + left_context_key=left_context_key, + left_context_val=left_context_val, + padding_mask=padding_mask, + ) + attn_cache = self._update_attn_cache(next_key, next_val, memory, attn_cache) + return output_right_context_utterance, attn_cache + + def forward( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + attention_mask: torch.Tensor, + padding_mask: Optional[torch.Tensor] = None, + warmup: float = 1.0, + ) -> Tuple[torch.Tensor, torch.Tensor]: + r"""Forward pass for training and validation mode. + + B: batch size; + D: embedding dimension; + R: length of hard-copied right contexts; + U: length of full utterance; + M: length of memory vectors. + + Args: + utterance (torch.Tensor): + Utterance frames, with shape (U, B, D). + right_context (torch.Tensor): + Right context frames, with shape (R, B, D). + attention_mask (torch.Tensor): + Attention mask for underlying attention module, + with shape (Q, KV), where Q = R + U, KV = M + R + U. + padding_mask (torch.Tensor): + Padding mask of ker tensor, with shape (B, KV). + + Returns: + A tuple containing 2 tensors: + - output utterance, with shape (U, B, D). + - output right context, with shape (R, B, D). + """ + R = right_context.size(0) + src = torch.cat([right_context, utterance]) + src_orig = src + + warmup_scale = min(0.1 + warmup, 1.0) + # alpha = 1.0 means fully use this encoder layer, 0.0 would mean + # completely bypass it. + if self.training: + alpha = ( + warmup_scale + if torch.rand(()).item() <= (1.0 - self.layer_dropout) + else 0.1 + ) + else: + alpha = 1.0 + + # macaron style feed forward module + src = src + self.dropout(self.feed_forward_macaron(src)) + + # emformer attention module + src_att = self._apply_attention_module_forward( + src, R, attention_mask, padding_mask=padding_mask + ) + src = src + self.dropout(src_att) + + # convolution module + src_conv = self._apply_conv_module_forward(src, R) + src = src + self.dropout(src_conv) + + # feed forward module + src = src + self.dropout(self.feed_forward(src)) + + src = self.norm_final(self.balancer(src)) + + if alpha != 1.0: + src = alpha * src + (1 - alpha) * src_orig + + output_utterance = src[R:] + output_right_context = src[:R] + return output_utterance, output_right_context + + @torch.jit.export + def infer( + self, + utterance: torch.Tensor, + right_context: torch.Tensor, + cache: List[torch.Tensor], + padding_mask: Optional[torch.Tensor] = None, + ) -> Tuple[torch.Tensor, torch.Tensor, List[torch.Tensor]]: + """Forward pass for inference. + + B: batch size; + D: embedding dimension; + R: length of right_context; + U: length of utterance; + M: length of memory. + + Args: + utterance (torch.Tensor): + Utterance frames, with shape (U, B, D). + right_context (torch.Tensor): + Right context frames, with shape (R, B, D). + attn_cache (List[torch.Tensor]): + Cached attention tensors generated in preceding computation, + including memory, key and value of left context. + conv_cache (torch.Tensor, optional): + Cache tensor of left context for causal convolution. + padding_mask (torch.Tensor): + Padding mask of ker tensor. + + Returns: + (Tensor, Tensor, List[torch.Tensor], Tensor): + - output utterance, with shape (U, B, D); + - output right_context, with shape (R, B, D); + - output attention cache; + - output convolution cache. + """ + R = self.right_context_length + src = torch.cat([right_context, utterance]) + attn_cache = cache[:3] + conv_cache = cache[3] + + # macaron style feed forward module + src = src + self.dropout(self.feed_forward_macaron(src)) + + # emformer attention module + src_att, attn_cache = self._apply_attention_module_infer( + src, R, attn_cache, padding_mask=padding_mask + ) + src = src + self.dropout(src_att) + + # convolution module + src_conv, conv_cache = self._apply_conv_module_infer(src, R, conv_cache) + src = src + self.dropout(src_conv) + + # feed forward module + src = src + self.dropout(self.feed_forward(src)) + + src = self.norm_final(self.balancer(src)) + + output_utterance = src[R:] + output_right_context = src[:R] + return (output_utterance, output_right_context, attn_cache + [conv_cache]) + + +def _gen_attention_mask_block( + col_widths: List[int], + col_mask: List[bool], + num_rows: int, + device: torch.device, +) -> torch.Tensor: + assert len(col_widths) == len( + col_mask + ), "Length of col_widths must match that of col_mask" + + mask_block = [ + torch.ones(num_rows, col_width, device=device) + if is_ones_col + else torch.zeros(num_rows, col_width, device=device) + for col_width, is_ones_col in zip(col_widths, col_mask) + ] + return torch.cat(mask_block, dim=1) + + +class EmformerEncoder(nn.Module): + """Implements the Emformer architecture introduced in + *Emformer: Efficient Memory Transformer Based Acoustic Model for Low Latency + Streaming Speech Recognition* + [:footcite:`shi2021emformer`]. + + In this model, the memory bank computation is simplifed, using the averaged + value of each chunk as its memory vector. + + Args: + d_model (int): + Input dimension. + nhead (int): + Number of attention heads in each emformer layer. + dim_feedforward (int): + Hidden layer dimension of each emformer layer's feedforward network. + num_encoder_layers (int): + Number of emformer layers to instantiate. + chunk_length (int): + Length of each input segment. + dropout (float, optional): + Dropout probability. (default: 0.0) + layer_dropout (float, optional): + Layer dropout probability. (default: 0.0) + cnn_module_kernel (int): + Kernel size of convolution module. + left_context_length (int, optional): + Length of left context. (default: 0) + right_context_length (int, optional): + Length of right context. (default: 0) + memory_size (int, optional): + Number of memory elements to use. (default: 0) + tanh_on_mem (bool, optional): + If ``true``, applies tanh to memory elements. (default: ``false``) + negative_inf (float, optional): + Value to use for negative infinity in attention weights. (default: -1e8) + """ + + def __init__( + self, + chunk_length: int, + d_model: int = 256, + nhead: int = 4, + dim_feedforward: int = 2048, + num_encoder_layers: int = 12, + dropout: float = 0.1, + layer_dropout: float = 0.075, + cnn_module_kernel: int = 31, + left_context_length: int = 0, + right_context_length: int = 0, + memory_size: int = 0, + tanh_on_mem: bool = False, + negative_inf: float = -1e8, + ): + super().__init__() + + assert ( + chunk_length - 1 + ) & chunk_length == 0, "chunk_length should be a power of 2." + self.shift = int(math.log(chunk_length, 2)) + + self.use_memory = memory_size > 0 + + self.emformer_layers = nn.ModuleList( + [ + EmformerEncoderLayer( + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + chunk_length=chunk_length, + dropout=dropout, + layer_dropout=layer_dropout, + cnn_module_kernel=cnn_module_kernel, + left_context_length=left_context_length, + right_context_length=right_context_length, + memory_size=memory_size, + tanh_on_mem=tanh_on_mem, + negative_inf=negative_inf, + ) + for layer_idx in range(num_encoder_layers) + ] + ) + + self.num_encoder_layers = num_encoder_layers + self.d_model = d_model + self.left_context_length = left_context_length + self.right_context_length = right_context_length + self.chunk_length = chunk_length + self.memory_size = memory_size + self.cnn_module_kernel = cnn_module_kernel + + def _gen_right_context(self, x: torch.Tensor) -> torch.Tensor: + """Hard copy each chunk's right context and concat them.""" + T = x.shape[0] + num_chunks = math.ceil((T - self.right_context_length) / self.chunk_length) + # first (num_chunks - 1) right context block + intervals = torch.arange( + 0, self.chunk_length * (num_chunks - 1), self.chunk_length + ) + first = torch.arange( + self.chunk_length, self.chunk_length + self.right_context_length + ) + indexes = intervals.unsqueeze(1) + first.unsqueeze(0) + # cat last right context block + indexes = torch.cat( + [ + indexes, + torch.arange(T - self.right_context_length, T).unsqueeze(0), + ] + ) + right_context_blocks = x[indexes.reshape(-1)] + return right_context_blocks + + def _gen_attention_mask_col_widths(self, chunk_idx: int, U: int) -> List[int]: + """Calculate column widths (key, value) in attention mask for the + chunk_idx chunk.""" + num_chunks = math.ceil(U / self.chunk_length) + rc = self.right_context_length + lc = self.left_context_length + rc_start = chunk_idx * rc + rc_end = rc_start + rc + chunk_start = max(chunk_idx * self.chunk_length - lc, 0) + chunk_end = min((chunk_idx + 1) * self.chunk_length, U) + R = rc * num_chunks + + if self.use_memory: + m_start = max(chunk_idx - self.memory_size, 0) + M = num_chunks - 1 + col_widths = [ + m_start, # before memory + chunk_idx - m_start, # memory + M - chunk_idx, # after memory + rc_start, # before right context + rc, # right context + R - rc_end, # after right context + chunk_start, # before chunk + chunk_end - chunk_start, # chunk + U - chunk_end, # after chunk + ] + else: + col_widths = [ + rc_start, # before right context + rc, # right context + R - rc_end, # after right context + chunk_start, # before chunk + chunk_end - chunk_start, # chunk + U - chunk_end, # after chunk + ] + + return col_widths + + def _gen_attention_mask(self, utterance: torch.Tensor) -> torch.Tensor: + """Generate attention mask to simulate underlying chunk-wise attention + computation, where chunk-wise connections are filled with `False`, + and other unnecessary connections beyond chunk are filled with `True`. + + R: length of hard-copied right contexts; + U: length of full utterance; + M: length of memory vectors; + Q: length of attention query; + KV: length of attention key and value. + + The shape of attention mask is (Q, KV). + If self.use_memory is `True`: + query = [right_context, utterance]; + key, value = [memory, right_context, utterance]; + Q = R + U, KV = M + R + U. + Otherwise: + query = [right_context, utterance] + key, value = [right_context, utterance] + Q = R + U, KV = R + U. + + Suppose: + c_i: chunk at index i; + r_i: right context that c_i can use; + l_i: left context that c_i can use; + m_i: past memory vectors from previous layer that c_i can use; + The target chunk-wise attention is: + c_i, r_i (in query) -> l_i, c_i, r_i, m_i (in key). + """ + U = utterance.size(0) + num_chunks = math.ceil(U / self.chunk_length) + + right_context_mask = [] + utterance_mask = [] + + if self.use_memory: + num_cols = 9 + # right context and utterance both attend to memory, right context, + # utterance + right_context_utterance_cols_mask = [ + idx in [1, 4, 7] for idx in range(num_cols) + ] + else: + num_cols = 6 + # right context and utterance both attend to right context and + # utterance + right_context_utterance_cols_mask = [ + idx in [1, 4] for idx in range(num_cols) + ] + masks_to_concat = [right_context_mask, utterance_mask] + + for chunk_idx in range(num_chunks): + col_widths = self._gen_attention_mask_col_widths(chunk_idx, U) + + right_context_mask_block = _gen_attention_mask_block( + col_widths, + right_context_utterance_cols_mask, + self.right_context_length, + utterance.device, + ) + right_context_mask.append(right_context_mask_block) + + utterance_mask_block = _gen_attention_mask_block( + col_widths, + right_context_utterance_cols_mask, + min( + self.chunk_length, + U - chunk_idx * self.chunk_length, + ), + utterance.device, + ) + utterance_mask.append(utterance_mask_block) + + attention_mask = ( + 1 - torch.cat([torch.cat(mask) for mask in masks_to_concat]) + ).to(torch.bool) + return attention_mask + + def _forward( + self, x: torch.Tensor, lengths: torch.Tensor, warmup: float = 1.0 + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Forward pass for training and validation mode. + + B: batch size; + D: input dimension; + U: length of utterance. + + Args: + x (torch.Tensor): + Utterance frames right-padded with right context frames, + with shape (U + right_context_length, B, D). + lengths (torch.Tensor): + With shape (B,) and i-th element representing number of valid + utterance frames for i-th batch element in x, which contains the + right_context at the end. + + Returns: + A tuple of 2 tensors: + - output utterance frames, with shape (U, B, D). + - output_lengths, with shape (B,), without containing the + right_context at the end. + """ + U = x.size(0) - self.right_context_length + + right_context = self._gen_right_context(x) + utterance = x[:U] + output_lengths = torch.clamp(lengths - self.right_context_length, min=0) + attention_mask = self._gen_attention_mask(utterance) + + M = ( + right_context.size(0) // self.right_context_length - 1 + if self.use_memory + else 0 + ) + padding_mask = make_pad_mask(M + right_context.size(0) + output_lengths) + + output = utterance + for layer in self.emformer_layers: + output, right_context = layer( + output, + right_context, + attention_mask, + padding_mask=padding_mask, + warmup=warmup, + ) + + return output, output_lengths + + @torch.jit.export + def infer( + self, + x: torch.Tensor, + states: List[torch.Tensor], + ) -> Tuple[torch.Tensor, List[torch.Tensor],]: + """Forward pass for streaming inference. + + B: batch size; + D: input dimension; + U: length of utterance. + + Args: + x (torch.Tensor): + Utterance frames right-padded with right context frames, + with shape (U + right_context_length, B, D). + lengths (torch.Tensor): + With shape (B,) and i-th element representing number of valid + utterance frames for i-th batch element in x, which contains the + right_context at the end. + states (List[torch.Tensor, List[List[torch.Tensor]], List[torch.Tensor]]: # noqa + Cached states containing: + - attn_caches: attention states from preceding chunk's computation, + where each element corresponds to each emformer layer + - conv_caches: left context for causal convolution, where each + element corresponds to each layer. + + Returns: + (Tensor, Tensor, List[List[torch.Tensor]], List[torch.Tensor]): + - output utterance frames, with shape (U, B, D). + - output lengths, with shape (B,), without containing the + right_context at the end. + - updated states from current chunk's computation. + """ + # lengths = chunk_length + right_context_length + utterance = x[: self.chunk_length] + right_context = x[self.chunk_length :] + # right_context_utterance = torch.cat([right_context, utterance]) + + output = utterance + output_states: List[torch.Tensor] = [] + for layer_idx, layer in enumerate(self.emformer_layers): + start = layer_idx * 4 + end = start + 4 + cache = states[start:end] + + (output, right_context, output_cache,) = layer.infer( + output, + right_context, + padding_mask=None, + cache=cache, + ) + output_states.extend(output_cache) + + return output, output_states + + @torch.jit.export + def init_states( + self, device: torch.device = torch.device("cpu") + ) -> List[torch.Tensor]: + """Create initial states.""" + # + states = [] + # layer0: attn cache, conv cache, 3 tensors + 1 tensor + # layer1: attn cache, conv cache, 3 tensors + 1 tensor + # layer2: attn cache, conv cache, 3 tensors + 1 tensor + # ... + # last layer: attn cache, conv cache, 3 tensors + 1 tensor + for i in range(self.num_encoder_layers): + states.append(torch.zeros(self.memory_size, 1, self.d_model, device=device)) + states.append( + torch.zeros(self.left_context_length, 1, self.d_model, device=device) + ) + states.append( + torch.zeros(self.left_context_length, 1, self.d_model, device=device) + ) + + states.append( + torch.zeros(1, self.d_model, self.cnn_module_kernel - 1, device=device) + ) + return states + + attn_caches = [ + [ + torch.zeros(self.memory_size, self.d_model, device=device), + torch.zeros(self.left_context_length, self.d_model, device=device), + torch.zeros(self.left_context_length, self.d_model, device=device), + ] + for _ in range(self.num_encoder_layers) + ] + conv_caches = [ + torch.zeros(self.d_model, self.cnn_module_kernel - 1, device=device) + for _ in range(self.num_encoder_layers) + ] + states: Tuple[List[List[torch.Tensor]], List[torch.Tensor]] = ( + attn_caches, + conv_caches, + ) + return states + + +class Emformer(EncoderInterface): + def __init__( + self, + num_features: int, + chunk_length: int, + subsampling_factor: int = 4, + d_model: int = 256, + nhead: int = 4, + dim_feedforward: int = 2048, + num_encoder_layers: int = 12, + dropout: float = 0.1, + layer_dropout: float = 0.075, + cnn_module_kernel: int = 3, + left_context_length: int = 0, + right_context_length: int = 0, + memory_size: int = 0, + tanh_on_mem: bool = False, + negative_inf: float = -1e8, + is_pnnx: bool = True, + ): + super().__init__() + + self.subsampling_factor = subsampling_factor + self.right_context_length = right_context_length + self.chunk_length = chunk_length + if subsampling_factor != 4: + raise NotImplementedError("Support only 'subsampling_factor=4'.") + if chunk_length % subsampling_factor != 0: + raise NotImplementedError( + "chunk_length must be a mutiple of subsampling_factor." + ) + if left_context_length != 0 and left_context_length % subsampling_factor != 0: + raise NotImplementedError( + "left_context_length must be 0 or a mutiple of subsampling_factor." # noqa + ) + if right_context_length != 0 and right_context_length % subsampling_factor != 0: + raise NotImplementedError( + "right_context_length must be 0 or a mutiple of subsampling_factor." # noqa + ) + + # self.encoder_embed converts the input of shape (N, T, num_features) + # to the shape (N, T//subsampling_factor, d_model). + # That is, it does two things simultaneously: + # (1) subsampling: T -> T//subsampling_factor + # (2) embedding: num_features -> d_model + self.encoder_embed = Conv2dSubsampling(num_features, d_model, is_pnnx=is_pnnx) + self.is_pnnx = is_pnnx + + self.encoder = EmformerEncoder( + chunk_length=chunk_length // subsampling_factor, + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + num_encoder_layers=num_encoder_layers, + dropout=dropout, + layer_dropout=layer_dropout, + cnn_module_kernel=cnn_module_kernel, + left_context_length=left_context_length // subsampling_factor, + right_context_length=right_context_length // subsampling_factor, + memory_size=memory_size, + tanh_on_mem=tanh_on_mem, + negative_inf=negative_inf, + ) + + def _forward( + self, x: torch.Tensor, x_lens: torch.Tensor, warmup: float = 1.0 + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Forward pass for training and non-streaming inference. + + B: batch size; + D: feature dimension; + T: length of utterance. + + Args: + x (torch.Tensor): + Utterance frames right-padded with right context frames, + with shape (B, T, D). + x_lens (torch.Tensor): + With shape (B,) and i-th element representing number of valid + utterance frames for i-th batch element in x, containing the + right_context at the end. + warmup: + A floating point value that gradually increases from 0 throughout + training; when it is >= 1.0 we are "fully warmed up". It is used + to turn modules on sequentially. + + Returns: + (Tensor, Tensor): + - output embedding, with shape (B, T', D), where + T' = ((T - 1) // 2 - 1) // 2 - self.right_context_length // 4. + - output lengths, with shape (B,), without containing the + right_context at the end. + """ + x = self.encoder_embed(x) + x = x.permute(1, 0, 2) # (N, T, C) -> (T, N, C) + + x_lens = (((x_lens - 1) >> 1) - 1) >> 1 + assert x.size(0) == x_lens.max().item() + + output, output_lengths = self.encoder(x, x_lens, warmup=warmup) # (T, N, C) + + output = output.permute(1, 0, 2) # (T, N, C) -> (N, T, C) + + return output, output_lengths + + def forward( + self, + x: torch.Tensor, + states: List[torch.Tensor], + ) -> Tuple[torch.Tensor, List[torch.Tensor],]: + """Forward pass for streaming inference. + + B: batch size; + D: feature dimension; + T: length of utterance. + + Args: + x (torch.Tensor): + Utterance frames right-padded with right context frames, + with shape (B, T, D). + lengths (torch.Tensor): + With shape (B,) and i-th element representing number of valid + utterance frames for i-th batch element in x, containing the + right_context at the end. + states (List[torch.Tensor, List[List[torch.Tensor]], List[torch.Tensor]]: # noqa + Cached states containing: + - past_lens: number of past frames for each sample in batch + - attn_caches: attention states from preceding chunk's computation, + where each element corresponds to each emformer layer + - conv_caches: left context for causal convolution, where each + element corresponds to each layer. + Returns: + (Tensor, Tensor): + - output embedding, with shape (B, T', D), where + T' = ((T - 1) // 2 - 1) // 2 - self.right_context_length // 4. + - output lengths, with shape (B,), without containing the + right_context at the end. + - updated states from current chunk's computation. + """ + x = self.encoder_embed(x) + # drop the first and last frames + x = x[:, 1:-1, :] + x = x.permute(1, 0, 2) # (N, T, C) -> (T, N, C) + + # Caution: We assume the subsampling factor is 4! + + output, output_states = self.encoder.infer(x, states) + + output = output.permute(1, 0, 2) # (T, N, C) -> (N, T, C) + + return output, output_states + + @torch.jit.export + def init_states( + self, device: torch.device = torch.device("cpu") + ) -> List[torch.Tensor]: + """Create initial states.""" + return self.encoder.init_states(device) + + +class Conv2dSubsampling(nn.Module): + """Convolutional 2D subsampling (to 1/4 length). + + Convert an input of shape (N, T, idim) to an output + with shape (N, T', odim), where + T' = ((T-1)//2 - 1)//2, which approximates T' == T//4 + + It is based on + https://github.com/espnet/espnet/blob/master/espnet/nets/pytorch_backend/transformer/subsampling.py # noqa + """ + + def __init__( + self, + in_channels: int, + out_channels: int, + layer1_channels: int = 8, + layer2_channels: int = 32, + layer3_channels: int = 128, + is_pnnx: bool = False, + ) -> None: + """ + Args: + in_channels: + Number of channels in. The input shape is (N, T, in_channels). + Caution: It requires: T >=7, in_channels >=7 + out_channels + Output dim. The output shape is (N, ((T-1)//2 - 1)//2, out_channels) + layer1_channels: + Number of channels in layer1 + layer1_channels: + Number of channels in layer2 + is_pnnx: + True if we are converting the model to PNNX format. + False otherwise. + """ + assert in_channels >= 7 + super().__init__() + + self.conv = nn.Sequential( + ScaledConv2d( + in_channels=1, + out_channels=layer1_channels, + kernel_size=3, + padding=1, + ), + ActivationBalancer(channel_dim=1), + DoubleSwish(), + ScaledConv2d( + in_channels=layer1_channels, + out_channels=layer2_channels, + kernel_size=3, + stride=2, + ), + ActivationBalancer(channel_dim=1), + DoubleSwish(), + ScaledConv2d( + in_channels=layer2_channels, + out_channels=layer3_channels, + kernel_size=3, + stride=2, + ), + ActivationBalancer(channel_dim=1), + DoubleSwish(), + ) + self.out = ScaledLinear( + layer3_channels * (((in_channels - 1) // 2 - 1) // 2), out_channels + ) + # set learn_eps=False because out_norm is preceded by `out`, and `out` + # itself has learned scale, so the extra degree of freedom is not + # needed. + self.out_norm = BasicNorm(out_channels, learn_eps=False) + # constrain median of output to be close to zero. + self.out_balancer = ActivationBalancer( + channel_dim=-1, min_positive=0.45, max_positive=0.55 + ) + + # ncnn supports only batch size == 1 + self.is_pnnx = is_pnnx + self.conv_out_dim = self.out.weight.shape[1] + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """Subsample x. + + Args: + x: + Its shape is (N, T, idim). + + Returns: + Return a tensor of shape (N, ((T-1)//2 - 1)//2, odim) + """ + # On entry, x is (N, T, idim) + x = x.unsqueeze(1) # (N, T, idim) -> (N, 1, T, idim) i.e., (N, C, H, W) + x = self.conv(x) + + if torch.jit.is_tracing() and self.is_pnnx: + x = x.permute(0, 2, 1, 3).reshape(1, -1, self.conv_out_dim) + x = self.out(x) + else: + # Now x is of shape (N, odim, ((T-1)//2-1)//2, ((idim-1)//2-1)//2) + b, c, t, f = x.size() + x = self.out(x.transpose(1, 2).contiguous().view(b, t, c * f)) + # Now x is of shape (N, ((T-1)//2 - 1))//2, odim) + x = self.out_norm(x) + x = self.out_balancer(x) + return x diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/export-for-ncnn.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/export-for-ncnn.py new file mode 100755 index 000000000..716de5734 --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/export-for-ncnn.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 + +""" +Usage: +./conv_emformer_transducer_stateless2/export-for-ncnn.py \ + --exp-dir ./conv_emformer_transducer_stateless2/exp \ + --bpe-model data/lang_bpe_500/bpe.model \ + --epoch 30 \ + --avg 10 \ + --use-averaged-model=True \ + --num-encoder-layers 12 \ + --chunk-length 32 \ + --cnn-module-kernel 31 \ + --left-context-length 32 \ + --right-context-length 8 \ + --memory-size 32 \ + +cd ./conv_emformer_transducer_stateless2/exp +pnnx encoder_jit_trace-pnnx.pt +pnnx decoder_jit_trace-pnnx.pt +pnnx joiner_jit_trace-pnnx.pt + +You can find converted models at +https://huggingface.co/csukuangfj/sherpa-ncnn-conv-emformer-transducer-2022-12-04 + +See ./streaming-ncnn-decode.py +and +https://github.com/k2-fsa/sherpa-ncnn +for usage. +""" + +import argparse +import logging +from pathlib import Path + +import sentencepiece as spm +import torch +from scaling_converter import convert_scaled_to_non_scaled +from train2 import add_model_arguments, get_params, get_transducer_model + +from icefall.checkpoint import ( + average_checkpoints, + average_checkpoints_with_averaged_model, + find_checkpoints, + load_checkpoint, +) +from icefall.utils import str2bool + + +def get_parser(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "--epoch", + type=int, + default=28, + help="""It specifies the checkpoint to use for averaging. + Note: Epoch counts from 0. + You can specify --avg to use more checkpoints for model averaging.""", + ) + + parser.add_argument( + "--iter", + type=int, + default=0, + help="""If positive, --epoch is ignored and it + will use the checkpoint exp_dir/checkpoint-iter.pt. + You can specify --avg to use more checkpoints for model averaging. + """, + ) + + parser.add_argument( + "--avg", + type=int, + default=15, + help="Number of checkpoints to average. Automatically select " + "consecutive checkpoints before the checkpoint specified by " + "'--epoch' and '--iter'", + ) + + parser.add_argument( + "--exp-dir", + type=str, + default="pruned_transducer_stateless2/exp", + help="""It specifies the directory where all training related + files, e.g., checkpoints, log, etc, are saved + """, + ) + + parser.add_argument( + "--bpe-model", + type=str, + default="data/lang_bpe_500/bpe.model", + help="Path to the BPE model", + ) + + parser.add_argument( + "--jit", + type=str2bool, + default=False, + help="""True to save a model after applying torch.jit.script. + """, + ) + + parser.add_argument( + "--context-size", + type=int, + default=2, + help="The context size in the decoder. 1 means bigram; 2 means tri-gram", + ) + + parser.add_argument( + "--use-averaged-model", + type=str2bool, + default=True, + help="Whether to load averaged model. Currently it only supports " + "using --epoch. If True, it would decode with the averaged model " + "over the epoch range from `epoch-avg` (excluded) to `epoch`." + "Actually only the models with epoch number of `epoch-avg` and " + "`epoch` are loaded for averaging. ", + ) + + add_model_arguments(parser) + + return parser + + +def export_encoder_model_jit_trace( + encoder_model: torch.nn.Module, + encoder_filename: str, +) -> None: + """Export the given encoder model with torch.jit.trace() + + Note: The warmup argument is fixed to 1. + + Args: + encoder_model: + The input encoder model + encoder_filename: + The filename to save the exported model. + """ + chunk_length = encoder_model.chunk_length # before subsampling + right_context_length = encoder_model.right_context_length # before subsampling + pad_length = right_context_length + 2 * 4 + 3 + s = f"chunk_length: {chunk_length}, " + s += f"right_context_length: {right_context_length}\n" + logging.info(s) + + T = chunk_length + pad_length + + x = torch.zeros(1, T, 80, dtype=torch.float32) + states = encoder_model.init_states() + states = encoder_model.init_states() + + traced_model = torch.jit.trace(encoder_model, (x, states)) + traced_model.save(encoder_filename) + logging.info(f"Saved to {encoder_filename}") + + +def export_decoder_model_jit_trace( + decoder_model: torch.nn.Module, + decoder_filename: str, +) -> None: + """Export the given decoder model with torch.jit.trace() + + Note: The argument need_pad is fixed to False. + + Args: + decoder_model: + The input decoder model + decoder_filename: + The filename to save the exported model. + """ + y = torch.zeros(10, decoder_model.context_size, dtype=torch.int64) + need_pad = torch.tensor([False]) + + traced_model = torch.jit.trace(decoder_model, (y, need_pad)) + traced_model.save(decoder_filename) + logging.info(f"Saved to {decoder_filename}") + + +def export_joiner_model_jit_trace( + joiner_model: torch.nn.Module, + joiner_filename: str, +) -> None: + """Export the given joiner model with torch.jit.trace() + + Note: The argument project_input is fixed to True. A user should not + project the encoder_out/decoder_out by himself/herself. The exported joiner + will do that for the user. + + Args: + joiner_model: + The input joiner model + joiner_filename: + The filename to save the exported model. + + """ + encoder_out_dim = joiner_model.encoder_proj.weight.shape[1] + decoder_out_dim = joiner_model.decoder_proj.weight.shape[1] + encoder_out = torch.rand(1, encoder_out_dim, dtype=torch.float32) + decoder_out = torch.rand(1, decoder_out_dim, dtype=torch.float32) + + traced_model = torch.jit.trace(joiner_model, (encoder_out, decoder_out)) + traced_model.save(joiner_filename) + logging.info(f"Saved to {joiner_filename}") + + +@torch.no_grad() +def main(): + args = get_parser().parse_args() + args.exp_dir = Path(args.exp_dir) + + params = get_params() + params.update(vars(args)) + + device = torch.device("cpu") + + logging.info(f"device: {device}") + + sp = spm.SentencePieceProcessor() + sp.load(params.bpe_model) + + # is defined in local/train_bpe_model.py + params.blank_id = sp.piece_to_id("") + params.vocab_size = sp.get_piece_size() + + logging.info(params) + + logging.info("About to create model") + model = get_transducer_model(params) + + if not params.use_averaged_model: + if params.iter > 0: + filenames = find_checkpoints(params.exp_dir, iteration=-params.iter)[ + : params.avg + ] + if len(filenames) == 0: + raise ValueError( + f"No checkpoints found for" + f" --iter {params.iter}, --avg {params.avg}" + ) + elif len(filenames) < params.avg: + raise ValueError( + f"Not enough checkpoints ({len(filenames)}) found for" + f" --iter {params.iter}, --avg {params.avg}" + ) + logging.info(f"averaging {filenames}") + model.to(device) + model.load_state_dict(average_checkpoints(filenames, device=device)) + elif params.avg == 1: + load_checkpoint(f"{params.exp_dir}/epoch-{params.epoch}.pt", model) + else: + start = params.epoch - params.avg + 1 + filenames = [] + for i in range(start, params.epoch + 1): + if i >= 1: + filenames.append(f"{params.exp_dir}/epoch-{i}.pt") + logging.info(f"averaging {filenames}") + model.to(device) + model.load_state_dict(average_checkpoints(filenames, device=device)) + else: + if params.iter > 0: + filenames = find_checkpoints(params.exp_dir, iteration=-params.iter)[ + : params.avg + 1 + ] + if len(filenames) == 0: + raise ValueError( + f"No checkpoints found for" + f" --iter {params.iter}, --avg {params.avg}" + ) + elif len(filenames) < params.avg + 1: + raise ValueError( + f"Not enough checkpoints ({len(filenames)}) found for" + f" --iter {params.iter}, --avg {params.avg}" + ) + filename_start = filenames[-1] + filename_end = filenames[0] + logging.info( + "Calculating the averaged model over iteration checkpoints" + f" from {filename_start} (excluded) to {filename_end}" + ) + model.to(device) + model.load_state_dict( + average_checkpoints_with_averaged_model( + filename_start=filename_start, + filename_end=filename_end, + device=device, + ) + ) + else: + assert params.avg > 0, params.avg + start = params.epoch - params.avg + assert start >= 1, start + filename_start = f"{params.exp_dir}/epoch-{start}.pt" + filename_end = f"{params.exp_dir}/epoch-{params.epoch}.pt" + logging.info( + f"Calculating the averaged model over epoch range from " + f"{start} (excluded) to {params.epoch}" + ) + model.to(device) + model.load_state_dict( + average_checkpoints_with_averaged_model( + filename_start=filename_start, + filename_end=filename_end, + device=device, + ) + ) + + model.to("cpu") + model.eval() + + convert_scaled_to_non_scaled(model, inplace=True) + logging.info("Using torch.jit.trace()") + + logging.info("Exporting encoder") + encoder_filename = params.exp_dir / "encoder_jit_trace-pnnx.pt" + export_encoder_model_jit_trace(model.encoder, encoder_filename) + + logging.info("Exporting decoder") + decoder_filename = params.exp_dir / "decoder_jit_trace-pnnx.pt" + export_decoder_model_jit_trace(model.decoder, decoder_filename) + + logging.info("Exporting joiner") + joiner_filename = params.exp_dir / "joiner_jit_trace-pnnx.pt" + export_joiner_model_jit_trace(model.joiner, joiner_filename) + + +if __name__ == "__main__": + formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" + + logging.basicConfig(format=formatter, level=logging.INFO) + main() diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/jit_pretrained.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/jit_pretrained.py new file mode 100755 index 000000000..1fe358c79 --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/jit_pretrained.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +# flake8: noqa +# Copyright 2022 Xiaomi Corp. (authors: Fangjun Kuang, Zengwei Yao) +# +# See ../../../../LICENSE for clarification regarding multiple authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This script loads torchscript models exported by `torch.jit.trace()` +and uses them to decode waves. +You can use the following command to get the exported models: + +./conv_emformer_transducer_stateless2/export-for-ncnn.py \ + --exp-dir ./conv_emformer_transducer_stateless2/exp \ + --bpe-model data/lang_bpe_500/bpe.model \ + --epoch 20 \ + --avg 10 + +Usage of this script: + +./conv_emformer_transducer_stateless2/jit_pretrained.py \ + --encoder-model-filename ./conv_emformer_transducer_stateless2/exp/encoder_jit_trace-pnnx.pt \ + --decoder-model-filename ./conv_emformer_transducer_stateless2/exp/decoder_jit_trace-pnnx.pt \ + --joiner-model-filename ./conv_emformer_transducer_stateless2/exp/joiner_jit_trace-pnnx.pt \ + --bpe-model ./data/lang_bpe_500/bpe.model \ + /path/to/foo.wav \ +""" + +import argparse +import logging +import math +from typing import List + +import kaldifeat +import sentencepiece as spm +import torch +import torchaudio +from kaldifeat import FbankOptions, OnlineFbank, OnlineFeature +from torch.nn.utils.rnn import pad_sequence +from typing import Optional, List + + +def get_parser(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "--encoder-model-filename", + type=str, + required=True, + help="Path to the encoder torchscript model. ", + ) + + parser.add_argument( + "--decoder-model-filename", + type=str, + required=True, + help="Path to the decoder torchscript model. ", + ) + + parser.add_argument( + "--joiner-model-filename", + type=str, + required=True, + help="Path to the joiner torchscript model. ", + ) + + parser.add_argument( + "--bpe-model", + type=str, + help="""Path to bpe.model.""", + ) + + parser.add_argument( + "sound_file", + type=str, + help="The input sound file(s) to transcribe. " + "Supported formats are those supported by torchaudio.load(). " + "For example, wav and flac are supported. " + "The sample rate has to be 16kHz.", + ) + + parser.add_argument( + "--sample-rate", + type=int, + default=16000, + help="The sample rate of the input sound file", + ) + + parser.add_argument( + "--context-size", + type=int, + default=2, + help="Context size of the decoder model", + ) + + return parser + + +def read_sound_files( + filenames: List[str], expected_sample_rate: float +) -> List[torch.Tensor]: + """Read a list of sound files into a list 1-D float32 torch tensors. + Args: + filenames: + A list of sound filenames. + expected_sample_rate: + The expected sample rate of the sound files. + Returns: + Return a list of 1-D float32 torch tensors. + """ + ans = [] + for f in filenames: + wave, sample_rate = torchaudio.load(f) + assert ( + sample_rate == expected_sample_rate + ), f"expected sample rate: {expected_sample_rate}. Given: {sample_rate}" + # We use only the first channel + ans.append(wave[0]) + return ans + + +def greedy_search( + decoder: torch.jit.ScriptModule, + joiner: torch.jit.ScriptModule, + encoder_out: torch.Tensor, + decoder_out: Optional[torch.Tensor] = None, + hyp: Optional[List[int]] = None, +): + assert encoder_out.ndim == 2 + context_size = 2 + blank_id = 0 + + if decoder_out is None: + assert hyp is None, hyp + hyp = [blank_id] * context_size + decoder_input = torch.tensor(hyp, dtype=torch.int32).unsqueeze(0) + # decoder_input.shape (1,, 1 context_size) + decoder_out = decoder(decoder_input, torch.tensor([0])).squeeze(1) + else: + assert decoder_out.ndim == 2 + assert hyp is not None, hyp + + T = encoder_out.size(0) + for i in range(T): + cur_encoder_out = encoder_out[i : i + 1] + joiner_out = joiner(cur_encoder_out, decoder_out).squeeze(0) + y = joiner_out.argmax(dim=0).item() + + if y != blank_id: + hyp.append(y) + decoder_input = hyp[-context_size:] + + decoder_input = torch.tensor(decoder_input, dtype=torch.int32).unsqueeze(0) + decoder_out = decoder(decoder_input, torch.tensor([0])).squeeze(1) + + return hyp, decoder_out + + +def create_streaming_feature_extractor(sample_rate) -> OnlineFeature: + """Create a CPU streaming feature extractor. + + At present, we assume it returns a fbank feature extractor with + fixed options. In the future, we will support passing in the options + from outside. + + Returns: + Return a CPU streaming feature extractor. + """ + opts = FbankOptions() + opts.device = "cpu" + opts.frame_opts.dither = 0 + opts.frame_opts.snip_edges = False + opts.frame_opts.samp_freq = sample_rate + opts.mel_opts.num_bins = 80 + return OnlineFbank(opts) + + +@torch.no_grad() +def main(): + parser = get_parser() + args = parser.parse_args() + logging.info(vars(args)) + + device = torch.device("cpu") + if torch.cuda.is_available(): + device = torch.device("cuda", 0) + + logging.info(f"device: {device}") + + encoder = torch.jit.load(args.encoder_model_filename) + decoder = torch.jit.load(args.decoder_model_filename) + joiner = torch.jit.load(args.joiner_model_filename) + + encoder.eval() + decoder.eval() + joiner.eval() + + encoder.to(device) + decoder.to(device) + joiner.to(device) + + sp = spm.SentencePieceProcessor() + sp.load(args.bpe_model) + + logging.info("Constructing Fbank computer") + online_fbank = create_streaming_feature_extractor(args.sample_rate) + + logging.info(f"Reading sound files: {args.sound_file}") + wave_samples = read_sound_files( + filenames=[args.sound_file], + expected_sample_rate=args.sample_rate, + )[0] + logging.info(wave_samples.shape) + + logging.info("Decoding started") + chunk_length = encoder.chunk_length + right_context_length = encoder.right_context_length + + # Assume the subsampling factor is 4 + pad_length = right_context_length + 2 * 4 + 3 + T = chunk_length + pad_length + + logging.info(f"chunk_length: {chunk_length}") + logging.info(f"right_context_length: {right_context_length}") + + states = encoder.init_states(device) + logging.info(f"num layers: {len(states)//4}") + + tail_padding = torch.zeros(int(0.3 * args.sample_rate), dtype=torch.float32) + + wave_samples = torch.cat([wave_samples, tail_padding]) + + chunk = int(0.25 * args.sample_rate) # 0.2 second + num_processed_frames = 0 + + hyp = None + decoder_out = None + + start = 0 + while start < wave_samples.numel(): + logging.info(f"{start}/{wave_samples.numel()}") + end = min(start + chunk, wave_samples.numel()) + samples = wave_samples[start:end] + start += chunk + online_fbank.accept_waveform( + sampling_rate=args.sample_rate, + waveform=samples, + ) + while online_fbank.num_frames_ready - num_processed_frames >= T: + frames = [] + for i in range(T): + frames.append(online_fbank.get_frame(num_processed_frames + i)) + num_processed_frames += chunk_length + frames = torch.cat(frames, dim=0).unsqueeze(0) + # TODO(fangjun): remove x_lens + x_lens = torch.tensor([T]) + encoder_out, _, states = encoder(frames, x_lens, states) + + hyp, decoder_out = greedy_search( + decoder, joiner, encoder_out.squeeze(0), decoder_out, hyp + ) + + context_size = 2 + + logging.info(args.sound_file) + logging.info(sp.decode(hyp[context_size:])) + + logging.info("Decoding Done") + + +torch.set_num_threads(4) +torch.set_num_interop_threads(1) +torch._C._jit_set_profiling_executor(False) +torch._C._jit_set_profiling_mode(False) +torch._C._set_graph_executor_optimize(False) +if __name__ == "__main__": + formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" + + logging.basicConfig(format=formatter, level=logging.INFO) + main() diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/lstmp.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/lstmp.py new file mode 120000 index 000000000..4f377cd01 --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/lstmp.py @@ -0,0 +1 @@ +../lstm_transducer_stateless2/lstmp.py \ No newline at end of file diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/scaling_converter.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/scaling_converter.py new file mode 120000 index 000000000..3b667058d --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/scaling_converter.py @@ -0,0 +1 @@ +../pruned_transducer_stateless3/scaling_converter.py \ No newline at end of file diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/streaming-ncnn-decode.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/streaming-ncnn-decode.py new file mode 100755 index 000000000..b21fe5c7e --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/streaming-ncnn-decode.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 Xiaomi Corp. (authors: Fangjun Kuang, Zengwei Yao) +# +# See ../../../../LICENSE for clarification regarding multiple authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Usage: + +./conv_emformer_transducer_stateless2/streaming-ncnn-decode.py \ + --tokens ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/tokens.txt \ + --encoder-param-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/encoder_jit_trace-epoch-30-avg-10-pnnx.ncnn.param \ + --encoder-bin-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/encoder_jit_trace-epoch-30-avg-10-pnnx.ncnn.bin \ + --decoder-param-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/decoder_jit_trace-epoch-30-avg-10-pnnx.ncnn.param \ + --decoder-bin-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/decoder_jit_trace-epoch-30-avg-10-pnnx.ncnn.bin \ + --joiner-param-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/joiner_jit_trace-epoch-30-avg-10-pnnx.ncnn.param \ + --joiner-bin-filename ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/joiner_jit_trace-epoch-30-avg-10-pnnx.ncnn.bin \ + ./sherpa-ncnn-conv-emformer-transducer-2022-12-04/test_wavs/1089-134686-0001.wav + +You can find pretrained models at +https://huggingface.co/csukuangfj/sherpa-ncnn-conv-emformer-transducer-2022-12-04 +""" + +import argparse +import logging +from typing import List, Optional, Tuple + +import k2 +import ncnn +import torch +import torchaudio +from kaldifeat import FbankOptions, OnlineFbank, OnlineFeature + + +def get_args(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--tokens", + type=str, + help="Path to tokens.txt", + ) + + parser.add_argument( + "--encoder-param-filename", + type=str, + help="Path to encoder.ncnn.param", + ) + + parser.add_argument( + "--encoder-bin-filename", + type=str, + help="Path to encoder.ncnn.bin", + ) + + parser.add_argument( + "--decoder-param-filename", + type=str, + help="Path to decoder.ncnn.param", + ) + + parser.add_argument( + "--decoder-bin-filename", + type=str, + help="Path to decoder.ncnn.bin", + ) + + parser.add_argument( + "--joiner-param-filename", + type=str, + help="Path to joiner.ncnn.param", + ) + + parser.add_argument( + "--joiner-bin-filename", + type=str, + help="Path to joiner.ncnn.bin", + ) + + parser.add_argument( + "sound_filename", + type=str, + help="Path to foo.wav", + ) + + return parser.parse_args() + + +class Model: + def __init__(self, args): + self.init_encoder(args) + self.init_decoder(args) + self.init_joiner(args) + + self.num_layers = 12 + self.memory_size = 32 + self.d_model = 512 + self.cnn_module_kernel = 31 + + self.left_context_length = 32 // 4 # after subsampling + self.chunk_length = 32 # before subsampling + right_context_length = 8 # before subsampling + pad_length = right_context_length + 2 * 4 + 3 + self.T = self.chunk_length + pad_length + print("T", self.T, self.chunk_length) + + def get_init_states(self) -> List[torch.Tensor]: + states = [] + + for i in range(self.num_layers): + s0 = torch.zeros(self.memory_size, self.d_model) + s1 = torch.zeros(self.left_context_length, self.d_model) + s2 = torch.zeros(self.left_context_length, self.d_model) + s3 = torch.zeros(self.d_model, self.cnn_module_kernel - 1) + states.extend([s0, s1, s2, s3]) + + return states + + def init_encoder(self, args): + encoder_net = ncnn.Net() + encoder_net.opt.use_packing_layout = False + encoder_net.opt.use_fp16_storage = False + encoder_param = args.encoder_param_filename + encoder_model = args.encoder_bin_filename + + encoder_net.load_param(encoder_param) + encoder_net.load_model(encoder_model) + + self.encoder_net = encoder_net + + def init_decoder(self, args): + decoder_param = args.decoder_param_filename + decoder_model = args.decoder_bin_filename + + decoder_net = ncnn.Net() + + decoder_net.load_param(decoder_param) + decoder_net.load_model(decoder_model) + + self.decoder_net = decoder_net + + def init_joiner(self, args): + joiner_param = args.joiner_param_filename + joiner_model = args.joiner_bin_filename + joiner_net = ncnn.Net() + joiner_net.load_param(joiner_param) + joiner_net.load_model(joiner_model) + + self.joiner_net = joiner_net + + def run_encoder( + self, + x: torch.Tensor, + states: List[torch.Tensor], + ) -> Tuple[torch.Tensor, List[torch.Tensor]]: + """ + Args: + x: + A tensor of shape (T, C) + states: + A list of tensors. len(states) == self.num_layers * 4 + Returns: + Return a tuple containing: + - encoder_out, a tensor of shape (T, encoder_dim). + - next_states, a list of tensors containing the next states + """ + with self.encoder_net.create_extractor() as ex: + ex.set_num_threads(4) + ex.input("in0", ncnn.Mat(x.numpy()).clone()) + + # layer0 in2-in5 + # layer1 in6-in9 + for i in range(self.num_layers): + offset = 1 + i * 4 + name = f"in{offset}" + # (32, 1, 512) -> (32, 512) + ex.input(name, ncnn.Mat(states[i * 4 + 0].numpy()).clone()) + + name = f"in{offset+1}" + # (8, 1, 512) -> (8, 512) + ex.input(name, ncnn.Mat(states[i * 4 + 1].numpy()).clone()) + + name = f"in{offset+2}" + # (8, 1, 512) -> (8, 512) + ex.input(name, ncnn.Mat(states[i * 4 + 2].numpy()).clone()) + + name = f"in{offset+3}" + # (1, 512, 2) -> (512, 2) + ex.input(name, ncnn.Mat(states[i * 4 + 3].numpy()).clone()) + + import pdb + + # pdb.set_trace() + ret, ncnn_out0 = ex.extract("out0") + # assert ret == 0, ret + encoder_out = torch.from_numpy(ncnn_out0.numpy()).clone() + + out_states: List[torch.Tensor] = [] + for i in range(4 * self.num_layers): + name = f"out{i+1}" + ret, ncnn_out_state = ex.extract(name) + assert ret == 0, ret + ncnn_out_state = torch.from_numpy(ncnn_out_state.numpy()) + out_states.append(ncnn_out_state) + + return encoder_out, out_states + + def run_decoder(self, decoder_input): + assert decoder_input.dtype == torch.int32 + + with self.decoder_net.create_extractor() as ex: + ex.set_num_threads(4) + ex.input("in0", ncnn.Mat(decoder_input.numpy()).clone()) + ret, ncnn_out0 = ex.extract("out0") + assert ret == 0, ret + decoder_out = torch.from_numpy(ncnn_out0.numpy()).clone() + return decoder_out + + def run_joiner(self, encoder_out, decoder_out): + with self.joiner_net.create_extractor() as ex: + ex.set_num_threads(4) + ex.input("in0", ncnn.Mat(encoder_out.numpy()).clone()) + ex.input("in1", ncnn.Mat(decoder_out.numpy()).clone()) + ret, ncnn_out0 = ex.extract("out0") + assert ret == 0, ret + joiner_out = torch.from_numpy(ncnn_out0.numpy()).clone() + return joiner_out + + +def read_sound_files( + filenames: List[str], expected_sample_rate: float +) -> List[torch.Tensor]: + """Read a list of sound files into a list 1-D float32 torch tensors. + Args: + filenames: + A list of sound filenames. + expected_sample_rate: + The expected sample rate of the sound files. + Returns: + Return a list of 1-D float32 torch tensors. + """ + ans = [] + for f in filenames: + wave, sample_rate = torchaudio.load(f) + assert ( + sample_rate == expected_sample_rate + ), f"expected sample rate: {expected_sample_rate}. Given: {sample_rate}" + # We use only the first channel + ans.append(wave[0]) + return ans + + +def create_streaming_feature_extractor() -> OnlineFeature: + """Create a CPU streaming feature extractor. + + At present, we assume it returns a fbank feature extractor with + fixed options. In the future, we will support passing in the options + from outside. + + Returns: + Return a CPU streaming feature extractor. + """ + opts = FbankOptions() + opts.device = "cpu" + opts.frame_opts.dither = 0 + opts.frame_opts.snip_edges = False + opts.frame_opts.samp_freq = 16000 + opts.mel_opts.num_bins = 80 + return OnlineFbank(opts) + + +def greedy_search( + model: Model, + encoder_out: torch.Tensor, + decoder_out: Optional[torch.Tensor] = None, + hyp: Optional[List[int]] = None, +): + context_size = 2 + blank_id = 0 + + if decoder_out is None: + assert hyp is None, hyp + hyp = [blank_id] * context_size + decoder_input = torch.tensor(hyp, dtype=torch.int32) # (1, context_size) + decoder_out = model.run_decoder(decoder_input).squeeze(0) + else: + assert decoder_out.ndim == 1 + assert hyp is not None, hyp + + T = encoder_out.size(0) + for t in range(T): + cur_encoder_out = encoder_out[t] + + joiner_out = model.run_joiner(cur_encoder_out, decoder_out) + y = joiner_out.argmax(dim=0).item() + if y != blank_id: + hyp.append(y) + decoder_input = hyp[-context_size:] + decoder_input = torch.tensor(decoder_input, dtype=torch.int32) + decoder_out = model.run_decoder(decoder_input).squeeze(0) + + return hyp, decoder_out + + +def main(): + args = get_args() + logging.info(vars(args)) + + model = Model(args) + + sound_file = args.sound_filename + + sample_rate = 16000 + + logging.info("Constructing Fbank computer") + online_fbank = create_streaming_feature_extractor() + + logging.info(f"Reading sound files: {sound_file}") + wave_samples = read_sound_files( + filenames=[sound_file], + expected_sample_rate=sample_rate, + )[0] + logging.info(wave_samples.shape) + + tail_padding = torch.zeros(int(0.3 * sample_rate), dtype=torch.float32) + + wave_samples = torch.cat([wave_samples, tail_padding]) + + states = model.get_init_states() + + hyp = None + decoder_out = None + + num_processed_frames = 0 + segment = model.T + offset = model.chunk_length + + chunk = int(1 * sample_rate) # 0.2 second + + start = 0 + while start < wave_samples.numel(): + end = min(start + chunk, wave_samples.numel()) + samples = wave_samples[start:end] + start += chunk + + online_fbank.accept_waveform( + sampling_rate=sample_rate, + waveform=samples, + ) + while online_fbank.num_frames_ready - num_processed_frames >= segment: + frames = [] + for i in range(segment): + frames.append(online_fbank.get_frame(num_processed_frames + i)) + num_processed_frames += offset + frames = torch.cat(frames, dim=0) + encoder_out, states = model.run_encoder(frames, states) + hyp, decoder_out = greedy_search(model, encoder_out, decoder_out, hyp) + + symbol_table = k2.SymbolTable.from_file(args.tokens) + + context_size = 2 + text = "" + for i in hyp[context_size:]: + text += symbol_table[i] + text = text.replace("▁", " ").strip() + + logging.info(sound_file) + logging.info(text) + + +if __name__ == "__main__": + formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" + + logging.basicConfig(format=formatter, level=logging.INFO) + + main() diff --git a/egs/librispeech/ASR/conv_emformer_transducer_stateless2/train2.py b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/train2.py new file mode 100755 index 000000000..c91f94876 --- /dev/null +++ b/egs/librispeech/ASR/conv_emformer_transducer_stateless2/train2.py @@ -0,0 +1,1128 @@ +#!/usr/bin/env python3 +# Copyright 2021 Xiaomi Corp. (authors: Fangjun Kuang, +# Wei Kang, +# Mingshuang Luo,) +# Zengwei Yao) +# +# See ../../../../LICENSE for clarification regarding multiple authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Usage: + +export CUDA_VISIBLE_DEVICES="0,1,2,3" + +./conv_emformer_transducer_stateless2/train.py \ + --world-size 4 \ + --num-epochs 30 \ + --start-epoch 1 \ + --exp-dir conv_emformer_transducer_stateless2/exp \ + --full-libri 1 \ + --max-duration 280 \ + --master-port 12321 \ + --num-encoder-layers 12 \ + --chunk-length 32 \ + --cnn-module-kernel 31 \ + --left-context-length 32 \ + --right-context-length 8 \ + --memory-size 32 + +# For mix precision training: +./conv_emformer_transducer_stateless2/train.py \ + --world-size 4 \ + --num-epochs 30 \ + --start-epoch 1 \ + --use-fp16 1 \ + --exp-dir conv_emformer_transducer_stateless2/exp \ + --full-libri 1 \ + --max-duration 300 \ + --master-port 12321 \ + --num-encoder-layers 12 \ + --chunk-length 32 \ + --cnn-module-kernel 31 \ + --left-context-length 32 \ + --right-context-length 8 \ + --memory-size 32 +""" + + +import argparse +import copy +import logging +import warnings +from pathlib import Path +from shutil import copyfile +from typing import Any, Dict, Optional, Tuple, Union + +import k2 +import optim +import sentencepiece as spm +import torch +import torch.multiprocessing as mp +import torch.nn as nn +from asr_datamodule import LibriSpeechAsrDataModule +from decoder import Decoder +from emformer2 import Emformer +from joiner import Joiner +from lhotse.cut import Cut +from lhotse.dataset.sampling.base import CutSampler +from lhotse.utils import fix_random_seed +from model import Transducer +from optim import Eden, Eve +from torch import Tensor +from torch.cuda.amp import GradScaler +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.utils.tensorboard import SummaryWriter + +from icefall import diagnostics +from icefall.checkpoint import load_checkpoint, remove_checkpoints +from icefall.checkpoint import save_checkpoint as save_checkpoint_impl +from icefall.checkpoint import ( + save_checkpoint_with_global_batch_idx, + update_averaged_model, +) +from icefall.dist import cleanup_dist, setup_dist +from icefall.env import get_env_info +from icefall.utils import AttributeDict, MetricsTracker, setup_logger, str2bool + +LRSchedulerType = Union[torch.optim.lr_scheduler._LRScheduler, optim.LRScheduler] + + +def add_model_arguments(parser: argparse.ArgumentParser): + parser.add_argument( + "--encoder-dim", + type=int, + default=512, + help="Attention dim for the Emformer", + ) + + parser.add_argument( + "--nhead", + type=int, + default=8, + help="Number of attention heads for the Emformer", + ) + + parser.add_argument( + "--dim-feedforward", + type=int, + default=2048, + help="Feed-forward dimension for the Emformer", + ) + + parser.add_argument( + "--num-encoder-layers", + type=int, + default=12, + help="Number of encoder layers for the Emformer", + ) + + parser.add_argument( + "--cnn-module-kernel", + type=int, + default=31, + help="Kernel size for the convolution module.", + ) + + parser.add_argument( + "--left-context-length", + type=int, + default=32, + help="""Number of frames before subsampling for left context + in the Emformer.""", + ) + + parser.add_argument( + "--chunk-length", + type=int, + default=32, + help="""Number of frames before subsampling for each chunk + in the Emformer.""", + ) + + parser.add_argument( + "--right-context-length", + type=int, + default=8, + help="""Number of frames before subsampling for right context + in the Emformer.""", + ) + + parser.add_argument( + "--memory-size", + type=int, + default=0, + help="Number of entries in the memory for the Emformer", + ) + + +def get_parser(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "--world-size", + type=int, + default=1, + help="Number of GPUs for DDP training.", + ) + + parser.add_argument( + "--master-port", + type=int, + default=12354, + help="Master port to use for DDP training.", + ) + + parser.add_argument( + "--tensorboard", + type=str2bool, + default=True, + help="Should various information be logged in tensorboard.", + ) + + parser.add_argument( + "--num-epochs", + type=int, + default=30, + help="Number of epochs to train.", + ) + + parser.add_argument( + "--start-epoch", + type=int, + default=1, + help="""Resume training from this epoch. It should be positive. + If larger than 1, it will load checkpoint from + exp-dir/epoch-{start_epoch-1}.pt + """, + ) + + parser.add_argument( + "--start-batch", + type=int, + default=0, + help="""If positive, --start-epoch is ignored and + it loads the checkpoint from exp-dir/checkpoint-{start_batch}.pt + """, + ) + + parser.add_argument( + "--exp-dir", + type=str, + default="pruned_transducer_stateless2/exp", + help="""The experiment dir. + It specifies the directory where all training related + files, e.g., checkpoints, log, etc, are saved + """, + ) + + parser.add_argument( + "--bpe-model", + type=str, + default="data/lang_bpe_500/bpe.model", + help="Path to the BPE model", + ) + + parser.add_argument( + "--initial-lr", + type=float, + default=0.003, + help="""The initial learning rate. This value should not need to be + changed.""", + ) + + parser.add_argument( + "--lr-batches", + type=float, + default=5000, + help="""Number of steps that affects how rapidly the learning rate decreases. + We suggest not to change this.""", + ) + + parser.add_argument( + "--lr-epochs", + type=float, + default=6, + help="""Number of epochs that affects how rapidly the learning rate decreases. + """, + ) + + parser.add_argument( + "--context-size", + type=int, + default=2, + help="The context size in the decoder. 1 means bigram; 2 means tri-gram", + ) + + parser.add_argument( + "--prune-range", + type=int, + default=5, + help="The prune range for rnnt loss, it means how many symbols(context)" + "we are using to compute the loss", + ) + + parser.add_argument( + "--lm-scale", + type=float, + default=0.25, + help="The scale to smooth the loss with lm " + "(output of prediction network) part.", + ) + + parser.add_argument( + "--am-scale", + type=float, + default=0.0, + help="The scale to smooth the loss with am (output of encoder network) part.", + ) + + parser.add_argument( + "--simple-loss-scale", + type=float, + default=0.5, + help="To get pruning ranges, we will calculate a simple version" + "loss(joiner is just addition), this simple loss also uses for" + "training (as a regularization item). We will scale the simple loss" + "with this parameter before adding to the final loss.", + ) + + parser.add_argument( + "--seed", + type=int, + default=42, + help="The seed for random generators intended for reproducibility", + ) + + parser.add_argument( + "--print-diagnostics", + type=str2bool, + default=False, + help="Accumulate stats on activations, print them and exit.", + ) + + parser.add_argument( + "--save-every-n", + type=int, + default=8000, + help="""Save checkpoint after processing this number of batches" + periodically. We save checkpoint to exp-dir/ whenever + params.batch_idx_train % save_every_n == 0. The checkpoint filename + has the form: f'exp-dir/checkpoint-{params.batch_idx_train}.pt' + Note: It also saves checkpoint to `exp-dir/epoch-xxx.pt` at the + end of each epoch where `xxx` is the epoch number counting from 0. + """, + ) + + parser.add_argument( + "--keep-last-k", + type=int, + default=20, + help="""Only keep this number of checkpoints on disk. + For instance, if it is 3, there are only 3 checkpoints + in the exp-dir with filenames `checkpoint-xxx.pt`. + It does not affect checkpoints with name `epoch-xxx.pt`. + """, + ) + + parser.add_argument( + "--average-period", + type=int, + default=100, + help="""Update the averaged model, namely `model_avg`, after processing + this number of batches. `model_avg` is a separate version of model, + in which each floating-point parameter is the average of all the + parameters from the start of training. Each time we take the average, + we do: `model_avg = model * (average_period / batch_idx_train) + + model_avg * ((batch_idx_train - average_period) / batch_idx_train)`. + """, + ) + + parser.add_argument( + "--use-fp16", + type=str2bool, + default=False, + help="Whether to use half precision training.", + ) + + add_model_arguments(parser) + + return parser + + +def get_params() -> AttributeDict: + """Return a dict containing training parameters. + + All training related parameters that are not passed from the commandline + are saved in the variable `params`. + + Commandline options are merged into `params` after they are parsed, so + you can also access them via `params`. + + Explanation of options saved in `params`: + + - best_train_loss: Best training loss so far. It is used to select + the model that has the lowest training loss. It is + updated during the training. + + - best_valid_loss: Best validation loss so far. It is used to select + the model that has the lowest validation loss. It is + updated during the training. + + - best_train_epoch: It is the epoch that has the best training loss. + + - best_valid_epoch: It is the epoch that has the best validation loss. + + - batch_idx_train: Used to writing statistics to tensorboard. It + contains number of batches trained so far across + epochs. + + - log_interval: Print training loss if batch_idx % log_interval` is 0 + + - reset_interval: Reset statistics if batch_idx % reset_interval is 0 + + - valid_interval: Run validation if batch_idx % valid_interval is 0 + + - feature_dim: The model input dim. It has to match the one used + in computing features. + + - subsampling_factor: The subsampling factor for the model. + + - encoder_dim: Hidden dim for multi-head attention model. + + - num_decoder_layers: Number of decoder layer of transformer decoder. + + - warm_step: The warm_step for Noam optimizer. + """ + params = AttributeDict( + { + "best_train_loss": float("inf"), + "best_valid_loss": float("inf"), + "best_train_epoch": -1, + "best_valid_epoch": -1, + "batch_idx_train": 0, + "log_interval": 50, + "reset_interval": 200, + "valid_interval": 3000, # For the 100h subset, use 800 + # parameters for Emformer + "feature_dim": 80, + "subsampling_factor": 4, + # parameters for decoder + "decoder_dim": 512, + # parameters for joiner + "joiner_dim": 512, + # parameters for Noam + "model_warm_step": 3000, # arg given to model, not for lrate + "env_info": get_env_info(), + } + ) + + return params + + +def get_encoder_model(params: AttributeDict) -> nn.Module: + # TODO: We can add an option to switch between Conformer and Transformer + encoder = Emformer( + num_features=params.feature_dim, + chunk_length=params.chunk_length, + subsampling_factor=params.subsampling_factor, + d_model=params.encoder_dim, + nhead=params.nhead, + dim_feedforward=params.dim_feedforward, + num_encoder_layers=params.num_encoder_layers, + cnn_module_kernel=params.cnn_module_kernel, + left_context_length=params.left_context_length, + right_context_length=params.right_context_length, + memory_size=params.memory_size, + ) + return encoder + + +def get_decoder_model(params: AttributeDict) -> nn.Module: + decoder = Decoder( + vocab_size=params.vocab_size, + decoder_dim=params.decoder_dim, + blank_id=params.blank_id, + context_size=params.context_size, + ) + return decoder + + +def get_joiner_model(params: AttributeDict) -> nn.Module: + joiner = Joiner( + encoder_dim=params.encoder_dim, + decoder_dim=params.decoder_dim, + joiner_dim=params.joiner_dim, + vocab_size=params.vocab_size, + ) + return joiner + + +def get_transducer_model(params: AttributeDict) -> nn.Module: + encoder = get_encoder_model(params) + decoder = get_decoder_model(params) + joiner = get_joiner_model(params) + + model = Transducer( + encoder=encoder, + decoder=decoder, + joiner=joiner, + encoder_dim=params.encoder_dim, + decoder_dim=params.decoder_dim, + joiner_dim=params.joiner_dim, + vocab_size=params.vocab_size, + ) + return model + + +def load_checkpoint_if_available( + params: AttributeDict, + model: nn.Module, + model_avg: nn.Module = None, + optimizer: Optional[torch.optim.Optimizer] = None, + scheduler: Optional[LRSchedulerType] = None, +) -> Optional[Dict[str, Any]]: + """Load checkpoint from file. + + If params.start_batch is positive, it will load the checkpoint from + `params.exp_dir/checkpoint-{params.start_batch}.pt`. Otherwise, if + params.start_epoch is larger than 1, it will load the checkpoint from + `params.start_epoch - 1`. + + Apart from loading state dict for `model` and `optimizer` it also updates + `best_train_epoch`, `best_train_loss`, `best_valid_epoch`, + and `best_valid_loss` in `params`. + + Args: + params: + The return value of :func:`get_params`. + model: + The training model. + model_avg: + The stored model averaged from the start of training. + optimizer: + The optimizer that we are using. + scheduler: + The scheduler that we are using. + Returns: + Return a dict containing previously saved training info. + """ + if params.start_batch > 0: + filename = params.exp_dir / f"checkpoint-{params.start_batch}.pt" + elif params.start_epoch > 1: + filename = params.exp_dir / f"epoch-{params.start_epoch-1}.pt" + else: + return None + + assert filename.is_file(), f"{filename} does not exist!" + + saved_params = load_checkpoint( + filename, + model=model, + model_avg=model_avg, + optimizer=optimizer, + scheduler=scheduler, + ) + + keys = [ + "best_train_epoch", + "best_valid_epoch", + "batch_idx_train", + "best_train_loss", + "best_valid_loss", + ] + for k in keys: + params[k] = saved_params[k] + + if params.start_batch > 0: + if "cur_epoch" in saved_params: + params["start_epoch"] = saved_params["cur_epoch"] + + if "cur_batch_idx" in saved_params: + params["cur_batch_idx"] = saved_params["cur_batch_idx"] + + return saved_params + + +def save_checkpoint( + params: AttributeDict, + model: Union[nn.Module, DDP], + model_avg: Optional[nn.Module] = None, + optimizer: Optional[torch.optim.Optimizer] = None, + scheduler: Optional[LRSchedulerType] = None, + sampler: Optional[CutSampler] = None, + scaler: Optional[GradScaler] = None, + rank: int = 0, +) -> None: + """Save model, optimizer, scheduler and training stats to file. + + Args: + params: + It is returned by :func:`get_params`. + model: + The training model. + model_avg: + The stored model averaged from the start of training. + optimizer: + The optimizer used in the training. + sampler: + The sampler for the training dataset. + scaler: + The scaler used for mix precision training. + """ + if rank != 0: + return + filename = params.exp_dir / f"epoch-{params.cur_epoch}.pt" + save_checkpoint_impl( + filename=filename, + model=model, + model_avg=model_avg, + params=params, + optimizer=optimizer, + scheduler=scheduler, + sampler=sampler, + scaler=scaler, + rank=rank, + ) + + if params.best_train_epoch == params.cur_epoch: + best_train_filename = params.exp_dir / "best-train-loss.pt" + copyfile(src=filename, dst=best_train_filename) + + if params.best_valid_epoch == params.cur_epoch: + best_valid_filename = params.exp_dir / "best-valid-loss.pt" + copyfile(src=filename, dst=best_valid_filename) + + +def compute_loss( + params: AttributeDict, + model: Union[nn.Module, DDP], + sp: spm.SentencePieceProcessor, + batch: dict, + is_training: bool, + warmup: float = 1.0, +) -> Tuple[Tensor, MetricsTracker]: + """ + Compute RNN-T loss given the model and its inputs. + + Args: + params: + Parameters for training. See :func:`get_params`. + model: + The model for training. It is an instance of Conformer in our case. + batch: + A batch of data. See `lhotse.dataset.K2SpeechRecognitionDataset()` + for the content in it. + is_training: + True for training. False for validation. When it is True, this + function enables autograd during computation; when it is False, it + disables autograd. + warmup: a floating point value which increases throughout training; + values >= 1.0 are fully warmed up and have all modules present. + """ + device = model.device if isinstance(model, DDP) else next(model.parameters()).device + feature = batch["inputs"] + # at entry, feature is (N, T, C) + assert feature.ndim == 3 + feature = feature.to(device) + + supervisions = batch["supervisions"] + feature_lens = supervisions["num_frames"].to(device) + + texts = batch["supervisions"]["text"] + y = sp.encode(texts, out_type=int) + y = k2.RaggedTensor(y).to(device) + + with torch.set_grad_enabled(is_training): + simple_loss, pruned_loss = model( + x=feature, + x_lens=feature_lens, + y=y, + prune_range=params.prune_range, + am_scale=params.am_scale, + lm_scale=params.lm_scale, + warmup=warmup, + ) + # after the main warmup step, we keep pruned_loss_scale small + # for the same amount of time (model_warm_step), to avoid + # overwhelming the simple_loss and causing it to diverge, + # in case it had not fully learned the alignment yet. + pruned_loss_scale = ( + 0.0 if warmup < 1.0 else (0.1 if warmup > 1.0 and warmup < 2.0 else 1.0) + ) + loss = params.simple_loss_scale * simple_loss + pruned_loss_scale * pruned_loss + + assert loss.requires_grad == is_training + + info = MetricsTracker() + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + info["frames"] = (feature_lens // params.subsampling_factor).sum().item() + + # `utt_duration` and `utt_pad_proportion` would be normalized by `utterances` # noqa + info["utterances"] = feature.size(0) + # averaged input duration in frames over utterances + info["utt_duration"] = feature_lens.sum().item() + # averaged padding proportion over utterances + info["utt_pad_proportion"] = ( + ((feature.size(1) - feature_lens) / feature.size(1)).sum().item() + ) + + # Note: We use reduction=sum while computing the loss. + info["loss"] = loss.detach().cpu().item() + info["simple_loss"] = simple_loss.detach().cpu().item() + info["pruned_loss"] = pruned_loss.detach().cpu().item() + + return loss, info + + +def compute_validation_loss( + params: AttributeDict, + model: Union[nn.Module, DDP], + sp: spm.SentencePieceProcessor, + valid_dl: torch.utils.data.DataLoader, + world_size: int = 1, +) -> MetricsTracker: + """Run the validation process.""" + model.eval() + + tot_loss = MetricsTracker() + + for batch_idx, batch in enumerate(valid_dl): + loss, loss_info = compute_loss( + params=params, + model=model, + sp=sp, + batch=batch, + is_training=False, + ) + assert loss.requires_grad is False + tot_loss = tot_loss + loss_info + + if world_size > 1: + tot_loss.reduce(loss.device) + + loss_value = tot_loss["loss"] / tot_loss["frames"] + if loss_value < params.best_valid_loss: + params.best_valid_epoch = params.cur_epoch + params.best_valid_loss = loss_value + + return tot_loss + + +def train_one_epoch( + params: AttributeDict, + model: Union[nn.Module, DDP], + optimizer: torch.optim.Optimizer, + scheduler: LRSchedulerType, + sp: spm.SentencePieceProcessor, + train_dl: torch.utils.data.DataLoader, + valid_dl: torch.utils.data.DataLoader, + scaler: GradScaler, + model_avg: Optional[nn.Module] = None, + tb_writer: Optional[SummaryWriter] = None, + world_size: int = 1, + rank: int = 0, +) -> None: + """Train the model for one epoch. + + The training loss from the mean of all frames is saved in + `params.train_loss`. It runs the validation process every + `params.valid_interval` batches. + + Args: + params: + It is returned by :func:`get_params`. + model: + The model for training. + optimizer: + The optimizer we are using. + scheduler: + The learning rate scheduler, we call step() every step. + train_dl: + Dataloader for the training dataset. + valid_dl: + Dataloader for the validation dataset. + scaler: + The scaler used for mix precision training. + model_avg: + The stored model averaged from the start of training. + tb_writer: + Writer to write log messages to tensorboard. + world_size: + Number of nodes in DDP training. If it is 1, DDP is disabled. + rank: + The rank of the node in DDP training. If no DDP is used, it should + be set to 0. + """ + model.train() + + tot_loss = MetricsTracker() + + cur_batch_idx = params.get("cur_batch_idx", 0) + + for batch_idx, batch in enumerate(train_dl): + if batch_idx < cur_batch_idx: + continue + cur_batch_idx = batch_idx + + params.batch_idx_train += 1 + batch_size = len(batch["supervisions"]["text"]) + + with torch.cuda.amp.autocast(enabled=params.use_fp16): + loss, loss_info = compute_loss( + params=params, + model=model, + sp=sp, + batch=batch, + is_training=True, + warmup=(params.batch_idx_train / params.model_warm_step), + ) + # summary stats + tot_loss = (tot_loss * (1 - 1 / params.reset_interval)) + loss_info + + # NOTE: We use reduction==sum and loss is computed over utterances + # in the batch and there is no normalization to it so far. + scaler.scale(loss).backward() + scheduler.step_batch(params.batch_idx_train) + scaler.step(optimizer) + scaler.update() + optimizer.zero_grad() + + if params.print_diagnostics and batch_idx == 5: + return + + if ( + rank == 0 + and params.batch_idx_train > 0 + and params.batch_idx_train % params.average_period == 0 + ): + update_averaged_model( + params=params, + model_cur=model, + model_avg=model_avg, + ) + + if ( + params.batch_idx_train > 0 + and params.batch_idx_train % params.save_every_n == 0 + ): + params.cur_batch_idx = batch_idx + save_checkpoint_with_global_batch_idx( + out_dir=params.exp_dir, + global_batch_idx=params.batch_idx_train, + model=model, + model_avg=model_avg, + params=params, + optimizer=optimizer, + scheduler=scheduler, + sampler=train_dl.sampler, + scaler=scaler, + rank=rank, + ) + del params.cur_batch_idx + remove_checkpoints( + out_dir=params.exp_dir, + topk=params.keep_last_k, + rank=rank, + ) + + if batch_idx % params.log_interval == 0: + cur_lr = scheduler.get_last_lr()[0] + logging.info( + f"Epoch {params.cur_epoch}, " + f"batch {batch_idx}, loss[{loss_info}], " + f"tot_loss[{tot_loss}], batch size: {batch_size}, " + f"lr: {cur_lr:.2e}" + ) + + if tb_writer is not None: + tb_writer.add_scalar( + "train/learning_rate", cur_lr, params.batch_idx_train + ) + + loss_info.write_summary( + tb_writer, "train/current_", params.batch_idx_train + ) + tot_loss.write_summary(tb_writer, "train/tot_", params.batch_idx_train) + + if batch_idx > 0 and batch_idx % params.valid_interval == 0: + logging.info("Computing validation loss") + valid_info = compute_validation_loss( + params=params, + model=model, + sp=sp, + valid_dl=valid_dl, + world_size=world_size, + ) + model.train() + logging.info(f"Epoch {params.cur_epoch}, validation: {valid_info}") + if tb_writer is not None: + valid_info.write_summary( + tb_writer, "train/valid_", params.batch_idx_train + ) + + loss_value = tot_loss["loss"] / tot_loss["frames"] + params.train_loss = loss_value + if params.train_loss < params.best_train_loss: + params.best_train_epoch = params.cur_epoch + params.best_train_loss = params.train_loss + + +def run(rank, world_size, args): + """ + Args: + rank: + It is a value between 0 and `world_size-1`, which is + passed automatically by `mp.spawn()` in :func:`main`. + The node with rank 0 is responsible for saving checkpoint. + world_size: + Number of GPUs for DDP training. + args: + The return value of get_parser().parse_args() + """ + params = get_params() + params.update(vars(args)) + if params.full_libri is False: + params.valid_interval = 1600 + + fix_random_seed(params.seed) + if world_size > 1: + setup_dist(rank, world_size, params.master_port) + + setup_logger(f"{params.exp_dir}/log/log-train") + logging.info("Training started") + + if args.tensorboard and rank == 0: + tb_writer = SummaryWriter(log_dir=f"{params.exp_dir}/tensorboard") + else: + tb_writer = None + + device = torch.device("cpu") + if torch.cuda.is_available(): + device = torch.device("cuda", rank) + logging.info(f"Device: {device}") + + sp = spm.SentencePieceProcessor() + sp.load(params.bpe_model) + + # is defined in local/train_bpe_model.py + params.blank_id = sp.piece_to_id("") + params.vocab_size = sp.get_piece_size() + + logging.info(params) + + logging.info("About to create model") + model = get_transducer_model(params) + + num_param = sum([p.numel() for p in model.parameters()]) + logging.info(f"Number of model parameters: {num_param}") + + assert params.save_every_n >= params.average_period + model_avg: Optional[nn.Module] = None + if rank == 0: + # model_avg is only used with rank 0 + model_avg = copy.deepcopy(model) + + assert params.start_epoch > 0, params.start_epoch + checkpoints = load_checkpoint_if_available( + params=params, model=model, model_avg=model_avg + ) + + model.to(device) + if world_size > 1: + logging.info("Using DDP") + model = DDP(model, device_ids=[rank]) + + optimizer = Eve(model.parameters(), lr=params.initial_lr) + + scheduler = Eden(optimizer, params.lr_batches, params.lr_epochs) + + if checkpoints and "optimizer" in checkpoints: + logging.info("Loading optimizer state dict") + optimizer.load_state_dict(checkpoints["optimizer"]) + + if ( + checkpoints + and "scheduler" in checkpoints + and checkpoints["scheduler"] is not None + ): + logging.info("Loading scheduler state dict") + scheduler.load_state_dict(checkpoints["scheduler"]) + + if params.print_diagnostics: + opts = diagnostics.TensorDiagnosticOptions( + 2**22 + ) # allow 4 megabytes per sub-module + diagnostic = diagnostics.attach_diagnostics(model, opts) + + librispeech = LibriSpeechAsrDataModule(args) + + if params.full_libri: + train_cuts = librispeech.train_all_shuf_cuts() + else: + train_cuts = librispeech.train_clean_100_cuts() + + def remove_short_and_long_utt(c: Cut): + # Keep only utterances with duration between 1 second and 20 seconds + # + # Caution: There is a reason to select 20.0 here. Please see + # ../local/display_manifest_statistics.py + # + # You should use ../local/display_manifest_statistics.py to get + # an utterance duration distribution for your dataset to select + # the threshold + return 1.0 <= c.duration <= 20.0 + + train_cuts = train_cuts.filter(remove_short_and_long_utt) + + if params.start_batch > 0 and checkpoints and "sampler" in checkpoints: + # We only load the sampler's state dict when it loads a checkpoint + # saved in the middle of an epoch + sampler_state_dict = checkpoints["sampler"] + else: + sampler_state_dict = None + + train_dl = librispeech.train_dataloaders( + train_cuts, sampler_state_dict=sampler_state_dict + ) + + valid_cuts = librispeech.dev_clean_cuts() + valid_cuts += librispeech.dev_other_cuts() + valid_dl = librispeech.valid_dataloaders(valid_cuts) + + if not params.print_diagnostics: + scan_pessimistic_batches_for_oom( + model=model, + train_dl=train_dl, + optimizer=optimizer, + sp=sp, + params=params, + ) + + scaler = GradScaler(enabled=params.use_fp16) + if checkpoints and "grad_scaler" in checkpoints: + logging.info("Loading grad scaler state dict") + scaler.load_state_dict(checkpoints["grad_scaler"]) + + for epoch in range(params.start_epoch, params.num_epochs + 1): + scheduler.step_epoch(epoch - 1) + fix_random_seed(params.seed + epoch - 1) + train_dl.sampler.set_epoch(epoch - 1) + + if tb_writer is not None: + tb_writer.add_scalar("train/epoch", epoch, params.batch_idx_train) + + params.cur_epoch = epoch + + train_one_epoch( + params=params, + model=model, + model_avg=model_avg, + optimizer=optimizer, + scheduler=scheduler, + sp=sp, + train_dl=train_dl, + valid_dl=valid_dl, + scaler=scaler, + tb_writer=tb_writer, + world_size=world_size, + rank=rank, + ) + + if params.print_diagnostics: + diagnostic.print_diagnostics() + break + + save_checkpoint( + params=params, + model=model, + model_avg=model_avg, + optimizer=optimizer, + scheduler=scheduler, + sampler=train_dl.sampler, + scaler=scaler, + rank=rank, + ) + + logging.info("Done!") + + if world_size > 1: + torch.distributed.barrier() + cleanup_dist() + + +def scan_pessimistic_batches_for_oom( + model: Union[nn.Module, DDP], + train_dl: torch.utils.data.DataLoader, + optimizer: torch.optim.Optimizer, + sp: spm.SentencePieceProcessor, + params: AttributeDict, +): + from lhotse.dataset import find_pessimistic_batches + + logging.info( + "Sanity check -- see if any of the batches in epoch 1 would cause OOM." + ) + batches, crit_values = find_pessimistic_batches(train_dl.sampler) + for criterion, cuts in batches.items(): + batch = train_dl.dataset[cuts] + try: + # warmup = 0.0 is so that the derivs for the pruned loss stay zero + # (i.e. are not remembered by the decaying-average in adam), because + # we want to avoid these params being subject to shrinkage in adam. + with torch.cuda.amp.autocast(enabled=params.use_fp16): + loss, _ = compute_loss( + params=params, + model=model, + sp=sp, + batch=batch, + is_training=True, + warmup=0.0, + ) + loss.backward() + optimizer.step() + optimizer.zero_grad() + except RuntimeError as e: + if "CUDA out of memory" in str(e): + logging.error( + "Your GPU ran out of memory with the current " + "max_duration setting. We recommend decreasing " + "max_duration and trying again.\n" + f"Failing criterion: {criterion} " + f"(={crit_values[criterion]}) ..." + ) + raise + + +def main(): + parser = get_parser() + LibriSpeechAsrDataModule.add_arguments(parser) + args = parser.parse_args() + args.exp_dir = Path(args.exp_dir) + + world_size = args.world_size + assert world_size >= 1 + if world_size > 1: + mp.spawn(run, args=(world_size, args), nprocs=world_size, join=True) + else: + run(rank=0, world_size=1, args=args) + + +torch.set_num_threads(1) +torch.set_num_interop_threads(1) + +if __name__ == "__main__": + main()