# Copyright 2021 Xiaomi Corp. (authors: Fangjun Kuang # Xiaoyu Yang) # # See ../../../../LICENSE for clarification regarding multiple authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import warnings from dataclasses import dataclass, field from typing import Dict, List, Optional, Union import k2 import torch from icefall.decode import one_best_decoding from icefall.utils import DecodingResults, get_texts, get_texts_with_timestamp def fast_beam_search( model: torch.nn.Module, decoding_graph: k2.Fsa, encoder_out: torch.Tensor, encoder_out_lens: torch.Tensor, beam: float, max_states: int, max_contexts: int, temperature: float = 1.0, ) -> k2.Fsa: """It limits the maximum number of symbols per frame to 1. Args: model: An instance of `Transducer`. decoding_graph: Decoding graph used for decoding, may be a TrivialGraph or a LG. encoder_out: A tensor of shape (N, T, C) from the encoder. encoder_out_lens: A tensor of shape (N,) containing the number of frames in `encoder_out` before padding. beam: Beam value, similar to the beam used in Kaldi.. max_states: Max states per stream per frame. max_contexts: Max contexts pre stream per frame. temperature: Softmax temperature. Returns: Return an FsaVec with axes [utt][state][arc] containing the decoded lattice. Note: When the input graph is a TrivialGraph, the returned lattice is actually an acceptor. """ assert encoder_out.ndim == 3 context_size = model.decoder.context_size vocab_size = model.decoder.vocab_size B, T, C = encoder_out.shape config = k2.RnntDecodingConfig( vocab_size=vocab_size, decoder_history_len=context_size, beam=beam, max_contexts=max_contexts, max_states=max_states, ) individual_streams = [] for i in range(B): individual_streams.append(k2.RnntDecodingStream(decoding_graph)) decoding_streams = k2.RnntDecodingStreams(individual_streams, config) encoder_out = model.joiner.encoder_proj(encoder_out) for t in range(T): # shape is a RaggedShape of shape (B, context) # contexts is a Tensor of shape (shape.NumElements(), context_size) shape, contexts = decoding_streams.get_contexts() # `nn.Embedding()` in torch below v1.7.1 supports only torch.int64 contexts = contexts.to(torch.int64) # decoder_out is of shape (shape.NumElements(), 1, decoder_out_dim) decoder_out = model.decoder(contexts, need_pad=False) decoder_out = model.joiner.decoder_proj(decoder_out) # current_encoder_out is of shape # (shape.NumElements(), 1, joiner_dim) # fmt: off current_encoder_out = torch.index_select( encoder_out[:, t:t + 1, :], 0, shape.row_ids(1).to(torch.int64) ) # fmt: on logits = model.joiner( current_encoder_out.unsqueeze(2), decoder_out.unsqueeze(1), project_input=False, ) logits = logits.squeeze(1).squeeze(1) log_probs = (logits / temperature).log_softmax(dim=-1) decoding_streams.advance(log_probs) decoding_streams.terminate_and_flush_to_streams() lattice = decoding_streams.format_output(encoder_out_lens.tolist()) return lattice def fast_beam_search_one_best( model: torch.nn.Module, decoding_graph: k2.Fsa, encoder_out: torch.Tensor, encoder_out_lens: torch.Tensor, beam: float, max_states: int, max_contexts: int, temperature: float = 1.0, return_timestamps: bool = False, ) -> Union[List[List[int]], DecodingResults]: """It limits the maximum number of symbols per frame to 1. A lattice is first obtained using fast beam search, and then the shortest path within the lattice is used as the final output. Args: model: An instance of `Transducer`. decoding_graph: Decoding graph used for decoding, may be a TrivialGraph or a LG. encoder_out: A tensor of shape (N, T, C) from the encoder. encoder_out_lens: A tensor of shape (N,) containing the number of frames in `encoder_out` before padding. beam: Beam value, similar to the beam used in Kaldi.. max_states: Max states per stream per frame. max_contexts: Max contexts pre stream per frame. temperature: Softmax temperature. return_timestamps: Whether to return timestamps. Returns: If return_timestamps is False, return the decoded result. Else, return a DecodingResults object containing decoded result and corresponding timestamps. """ lattice = fast_beam_search( model=model, decoding_graph=decoding_graph, encoder_out=encoder_out, encoder_out_lens=encoder_out_lens, beam=beam, max_states=max_states, max_contexts=max_contexts, temperature=temperature, ) best_path = one_best_decoding(lattice) if not return_timestamps: return get_texts(best_path) else: return get_texts_with_timestamp(best_path) def greedy_search_batch( model: torch.nn.Module, encoder_out: torch.Tensor, encoder_out_lens: torch.Tensor, return_timestamps: bool = False, ) -> Union[List[List[int]], DecodingResults]: """Greedy search in batch mode. It hardcodes --max-sym-per-frame=1. Args: model: The transducer model. encoder_out: Output from the encoder. Its shape is (N, T, C), where N >= 1. encoder_out_lens: A 1-D tensor of shape (N,), containing number of valid frames in encoder_out before padding. return_timestamps: Whether to return timestamps. Returns: If return_timestamps is False, return the decoded result. Else, return a DecodingResults object containing decoded result and corresponding timestamps. """ assert encoder_out.ndim == 3 assert encoder_out.size(0) >= 1, encoder_out.size(0) packed_encoder_out = torch.nn.utils.rnn.pack_padded_sequence( input=encoder_out, lengths=encoder_out_lens.cpu(), batch_first=True, enforce_sorted=False, ) device = next(model.parameters()).device blank_id = model.decoder.blank_id unk_id = getattr(model, "unk_id", blank_id) context_size = model.decoder.context_size batch_size_list = packed_encoder_out.batch_sizes.tolist() N = encoder_out.size(0) assert torch.all(encoder_out_lens > 0), encoder_out_lens assert N == batch_size_list[0], (N, batch_size_list) hyps = [[-1] * (context_size - 1) + [blank_id] for _ in range(N)] # timestamp[n][i] is the frame index after subsampling # on which hyp[n][i] is decoded timestamps = [[] for _ in range(N)] # scores[n][i] is the logits on which hyp[n][i] is decoded scores = [[] for _ in range(N)] decoder_input = torch.tensor( hyps, device=device, dtype=torch.int64, ) # (N, context_size) decoder_out = model.decoder(decoder_input, need_pad=False) decoder_out = model.joiner.decoder_proj(decoder_out) # decoder_out: (N, 1, decoder_out_dim) encoder_out = model.joiner.encoder_proj(packed_encoder_out.data) offset = 0 for (t, batch_size) in enumerate(batch_size_list): start = offset end = offset + batch_size current_encoder_out = encoder_out.data[start:end] current_encoder_out = current_encoder_out.unsqueeze(1).unsqueeze(1) # current_encoder_out's shape: (batch_size, 1, 1, encoder_out_dim) offset = end decoder_out = decoder_out[:batch_size] logits = model.joiner( current_encoder_out, decoder_out.unsqueeze(1), project_input=False ) # logits'shape (batch_size, 1, 1, vocab_size) logits = logits.squeeze(1).squeeze(1) # (batch_size, vocab_size) log_probs = logits.log_softmax(dim=-1) assert log_probs.ndim == 2, log_probs.shape y = log_probs.argmax(dim=1).tolist() emitted = False for i, v in enumerate(y): if v not in (blank_id, unk_id): hyps[i].append(v) timestamps[i].append(t) scores[i].append(log_probs[i, v].item()) emitted = True if emitted: # update decoder output decoder_input = [h[-context_size:] for h in hyps[:batch_size]] decoder_input = torch.tensor( decoder_input, device=device, dtype=torch.int64, ) decoder_out = model.decoder(decoder_input, need_pad=False) decoder_out = model.joiner.decoder_proj(decoder_out) sorted_ans = [h[context_size:] for h in hyps] ans = [] ans_timestamps = [] ans_scores = [] unsorted_indices = packed_encoder_out.unsorted_indices.tolist() for i in range(N): ans.append(sorted_ans[unsorted_indices[i]]) ans_timestamps.append(timestamps[unsorted_indices[i]]) ans_scores.append(scores[unsorted_indices[i]]) if not return_timestamps: return ans else: return DecodingResults( hyps=ans, timestamps=ans_timestamps, scores=ans_scores, ) @dataclass class Hypothesis: # The predicted tokens so far. # Newly predicted tokens are appended to `ys`. ys: List[int] # The log prob of ys. # It contains only one entry. log_prob: torch.Tensor # timestamp[i] is the frame index after subsampling # on which ys[i] is decoded timestamp: List[int] = field(default_factory=list) @property def key(self) -> str: """Return a string representation of self.ys""" return "_".join(map(str, self.ys)) class HypothesisList(object): def __init__(self, data: Optional[Dict[str, Hypothesis]] = None) -> None: """ Args: data: A dict of Hypotheses. Its key is its `value.key`. """ if data is None: self._data = {} else: self._data = data @property def data(self) -> Dict[str, Hypothesis]: return self._data def add(self, hyp: Hypothesis) -> None: """Add a Hypothesis to `self`. If `hyp` already exists in `self`, its probability is updated using `log-sum-exp` with the existed one. Args: hyp: The hypothesis to be added. """ key = hyp.key if key in self: old_hyp = self._data[key] # shallow copy torch.logaddexp(old_hyp.log_prob, hyp.log_prob, out=old_hyp.log_prob) else: self._data[key] = hyp def get_most_probable(self, length_norm: bool = False) -> Hypothesis: """Get the most probable hypothesis, i.e., the one with the largest `log_prob`. Args: length_norm: If True, the `log_prob` of a hypothesis is normalized by the number of tokens in it. Returns: Return the hypothesis that has the largest `log_prob`. """ if length_norm: return max(self._data.values(), key=lambda hyp: hyp.log_prob / len(hyp.ys)) else: return max(self._data.values(), key=lambda hyp: hyp.log_prob) def remove(self, hyp: Hypothesis) -> None: """Remove a given hypothesis. Caution: `self` is modified **in-place**. Args: hyp: The hypothesis to be removed from `self`. Note: It must be contained in `self`. Otherwise, an exception is raised. """ key = hyp.key assert key in self, f"{key} does not exist" del self._data[key] def filter(self, threshold: torch.Tensor) -> "HypothesisList": """Remove all Hypotheses whose log_prob is less than threshold. Caution: `self` is not modified. Instead, a new HypothesisList is returned. Returns: Return a new HypothesisList containing all hypotheses from `self` with `log_prob` being greater than the given `threshold`. """ ans = HypothesisList() for _, hyp in self._data.items(): if hyp.log_prob > threshold: ans.add(hyp) # shallow copy return ans def topk(self, k: int) -> "HypothesisList": """Return the top-k hypothesis.""" hyps = list(self._data.items()) hyps = sorted(hyps, key=lambda h: h[1].log_prob, reverse=True)[:k] ans = HypothesisList(dict(hyps)) return ans def __contains__(self, key: str): return key in self._data def __iter__(self): return iter(self._data.values()) def __len__(self) -> int: return len(self._data) def __str__(self) -> str: s = [] for key in self: s.append(key) return ", ".join(s) def get_hyps_shape(hyps: List[HypothesisList]) -> k2.RaggedShape: """Return a ragged shape with axes [utt][num_hyps]. Args: hyps: len(hyps) == batch_size. It contains the current hypothesis for each utterance in the batch. Returns: Return a ragged shape with 2 axes [utt][num_hyps]. Note that the shape is on CPU. """ num_hyps = [len(h) for h in hyps] # torch.cumsum() is inclusive sum, so we put a 0 at the beginning # to get exclusive sum later. num_hyps.insert(0, 0) num_hyps = torch.tensor(num_hyps) row_splits = torch.cumsum(num_hyps, dim=0, dtype=torch.int32) ans = k2.ragged.create_ragged_shape2( row_splits=row_splits, cached_tot_size=row_splits[-1].item() ) return ans def modified_beam_search( model: torch.nn.Module, encoder_out: torch.Tensor, encoder_out_lens: torch.Tensor, beam: int = 4, temperature: float = 1.0, return_timestamps: bool = False, ) -> Union[List[List[int]], DecodingResults]: """Beam search in batch mode with --max-sym-per-frame=1 being hardcoded. Args: model: The transducer model. encoder_out: Output from the encoder. Its shape is (N, T, C). encoder_out_lens: A 1-D tensor of shape (N,), containing number of valid frames in encoder_out before padding. beam: Number of active paths during the beam search. temperature: Softmax temperature. return_timestamps: Whether to return timestamps. Returns: If return_timestamps is False, return the decoded result. Else, return a DecodingResults object containing decoded result and corresponding timestamps. """ assert encoder_out.ndim == 3, encoder_out.shape assert encoder_out.size(0) >= 1, encoder_out.size(0) packed_encoder_out = torch.nn.utils.rnn.pack_padded_sequence( input=encoder_out, lengths=encoder_out_lens.cpu(), batch_first=True, enforce_sorted=False, ) blank_id = model.decoder.blank_id unk_id = getattr(model, "unk_id", blank_id) context_size = model.decoder.context_size device = next(model.parameters()).device batch_size_list = packed_encoder_out.batch_sizes.tolist() N = encoder_out.size(0) assert torch.all(encoder_out_lens > 0), encoder_out_lens assert N == batch_size_list[0], (N, batch_size_list) B = [HypothesisList() for _ in range(N)] for i in range(N): B[i].add( Hypothesis( ys=[blank_id] * context_size, log_prob=torch.zeros(1, dtype=torch.float32, device=device), timestamp=[], ) ) encoder_out = model.joiner.encoder_proj(packed_encoder_out.data) offset = 0 finalized_B = [] for (t, batch_size) in enumerate(batch_size_list): start = offset end = offset + batch_size current_encoder_out = encoder_out.data[start:end] current_encoder_out = current_encoder_out.unsqueeze(1).unsqueeze(1) # current_encoder_out's shape is (batch_size, 1, 1, encoder_out_dim) offset = end finalized_B = B[batch_size:] + finalized_B B = B[:batch_size] hyps_shape = get_hyps_shape(B).to(device) A = [list(b) for b in B] B = [HypothesisList() for _ in range(batch_size)] ys_log_probs = torch.cat( [hyp.log_prob.reshape(1, 1) for hyps in A for hyp in hyps] ) # (num_hyps, 1) decoder_input = torch.tensor( [hyp.ys[-context_size:] for hyps in A for hyp in hyps], device=device, dtype=torch.int64, ) # (num_hyps, context_size) decoder_out = model.decoder(decoder_input, need_pad=False).unsqueeze(1) decoder_out = model.joiner.decoder_proj(decoder_out) # decoder_out is of shape (num_hyps, 1, 1, joiner_dim) # Note: For torch 1.7.1 and below, it requires a torch.int64 tensor # as index, so we use `to(torch.int64)` below. current_encoder_out = torch.index_select( current_encoder_out, dim=0, index=hyps_shape.row_ids(1).to(torch.int64), ) # (num_hyps, 1, 1, encoder_out_dim) logits = model.joiner( current_encoder_out, decoder_out, project_input=False, ) # (num_hyps, 1, 1, vocab_size) logits = logits.squeeze(1).squeeze(1) # (num_hyps, vocab_size) log_probs = (logits / temperature).log_softmax(dim=-1) # (num_hyps, vocab_size) log_probs.add_(ys_log_probs) vocab_size = log_probs.size(-1) log_probs = log_probs.reshape(-1) row_splits = hyps_shape.row_splits(1) * vocab_size log_probs_shape = k2.ragged.create_ragged_shape2( row_splits=row_splits, cached_tot_size=log_probs.numel() ) ragged_log_probs = k2.RaggedTensor(shape=log_probs_shape, value=log_probs) for i in range(batch_size): topk_log_probs, topk_indexes = ragged_log_probs[i].topk(beam) with warnings.catch_warnings(): warnings.simplefilter("ignore") topk_hyp_indexes = (topk_indexes // vocab_size).tolist() topk_token_indexes = (topk_indexes % vocab_size).tolist() for k in range(len(topk_hyp_indexes)): hyp_idx = topk_hyp_indexes[k] hyp = A[i][hyp_idx] new_ys = hyp.ys[:] new_token = topk_token_indexes[k] new_timestamp = hyp.timestamp[:] if new_token not in (blank_id, unk_id): new_ys.append(new_token) new_timestamp.append(t) new_log_prob = topk_log_probs[k] new_hyp = Hypothesis( ys=new_ys, log_prob=new_log_prob, timestamp=new_timestamp ) B[i].add(new_hyp) B = B + finalized_B best_hyps = [b.get_most_probable(length_norm=True) for b in B] sorted_ans = [h.ys[context_size:] for h in best_hyps] sorted_timestamps = [h.timestamp for h in best_hyps] ans = [] ans_timestamps = [] unsorted_indices = packed_encoder_out.unsorted_indices.tolist() for i in range(N): ans.append(sorted_ans[unsorted_indices[i]]) ans_timestamps.append(sorted_timestamps[unsorted_indices[i]]) if not return_timestamps: return ans else: return DecodingResults( hyps=ans, timestamps=ans_timestamps, )