# Copyright 2021 Piotr Żelasko # 2021 Xiaomi Corp. (authors: Fangjun Kuang) # # See ../../../../LICENSE for clarification regarding multiple authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging from functools import lru_cache from pathlib import Path from typing import List from lhotse import CutSet, Fbank, FbankConfig, load_manifest_lazy from lhotse.dataset import ( CutConcatenate, DynamicBucketingSampler, K2SpeechRecognitionDataset, PrecomputedFeatures, SingleCutSampler, ) from lhotse.dataset.input_strategies import OnTheFlyFeatures from torch.utils.data import DataLoader from icefall.dataset.datamodule import DataModule from icefall.utils import str2bool class YesNoAsrDataModule(DataModule): """ DataModule for k2 ASR experiments. It assumes there is always one train dataloader, but there can be multiple test dataloaders (e.g. LibriSpeech test-clean and test-other). It contains all the common data pipeline modules used in ASR experiments, e.g.: - dynamic batch size, - bucketing samplers, - cut concatenation, - augmentation, - on-the-fly feature extraction """ @classmethod def add_arguments(cls, parser: argparse.ArgumentParser): super().add_arguments(parser) group = parser.add_argument_group( title="ASR data related options", description="These options are used for the preparation of " "PyTorch DataLoaders from Lhotse CutSet's -- they control the " "effective batch sizes, sampling strategies, applied data " "augmentations, etc.", ) group.add_argument( "--feature-dir", type=Path, default=Path("data/fbank"), help="Path to directory with train/test cuts.", ) group.add_argument( "--max-duration", type=int, default=30.0, help="Maximum pooled recordings duration (seconds) in a " "single batch. You can reduce it if it causes CUDA OOM.", ) group.add_argument( "--bucketing-sampler", type=str2bool, default=False, help="When enabled, the batches will come from buckets of " "similar duration (saves padding frames).", ) group.add_argument( "--num-buckets", type=int, default=10, help="The number of buckets for the DynamicBucketingSampler" "(you might want to increase it for larger datasets).", ) group.add_argument( "--concatenate-cuts", type=str2bool, default=False, help="When enabled, utterances (cuts) will be concatenated " "to minimize the amount of padding.", ) group.add_argument( "--duration-factor", type=float, default=1.0, help="Determines the maximum duration of a concatenated cut " "relative to the duration of the longest cut in a batch.", ) group.add_argument( "--gap", type=float, default=1.0, help="The amount of padding (in seconds) inserted between " "concatenated cuts. This padding is filled with noise when " "noise augmentation is used.", ) group.add_argument( "--on-the-fly-feats", type=str2bool, default=False, help="When enabled, use on-the-fly cut mixing and feature " "extraction. Will drop existing precomputed feature manifests " "if available.", ) group.add_argument( "--shuffle", type=str2bool, default=False, help="When enabled (=default), the examples will be " "shuffled for each epoch.", ) group.add_argument( "--return-cuts", type=str2bool, default=True, help="When enabled, each batch will have the " "field: batch['supervisions']['cut'] with the cuts that " "were used to construct it.", ) group.add_argument( "--num-workers", type=int, default=2, help="The number of training dataloader workers that " "collect the batches.", ) def train_dataloaders(self) -> DataLoader: logging.info("About to get train cuts") cuts_train = self.train_cuts() logging.info("About to create train dataset") transforms = [] if self.args.concatenate_cuts: logging.info( f"Using cut concatenation with duration factor " f"{self.args.duration_factor} and gap {self.args.gap}." ) # Cut concatenation should be the first transform in the list, # so that if we e.g. mix noise in, it will fill the gaps between # different utterances. transforms = [ CutConcatenate( duration_factor=self.args.duration_factor, gap=self.args.gap ) ] + transforms train = K2SpeechRecognitionDataset( cut_transforms=transforms, return_cuts=self.args.return_cuts, ) if self.args.on_the_fly_feats: # NOTE: the PerturbSpeed transform should be added only if we # remove it from data prep stage. # Add on-the-fly speed perturbation; since originally it would # have increased epoch size by 3, we will apply prob 2/3 and use # 3x more epochs. # Speed perturbation probably should come first before # concatenation, but in principle the transforms order doesn't have # to be strict (e.g. could be randomized) # transforms = [PerturbSpeed(factors=[0.9, 1.1], p=2/3)] + transforms # noqa # Drop feats to be on the safe side. train = K2SpeechRecognitionDataset( cut_transforms=transforms, input_strategy=OnTheFlyFeatures( FbankConfig(sampling_rate=8000, num_mel_bins=23) ), return_cuts=self.args.return_cuts, ) if self.args.bucketing_sampler: logging.info("Using DynamicBucketingSampler.") train_sampler = DynamicBucketingSampler( cuts_train, max_duration=self.args.max_duration, shuffle=self.args.shuffle, num_buckets=self.args.num_buckets, drop_last=True, ) else: logging.info("Using SingleCutSampler.") train_sampler = SingleCutSampler( cuts_train, max_duration=self.args.max_duration, shuffle=self.args.shuffle, ) logging.info("About to create train dataloader") train_dl = DataLoader( train, sampler=train_sampler, batch_size=None, num_workers=self.args.num_workers, persistent_workers=False, ) return train_dl def test_dataloaders(self) -> DataLoader: logging.info("About to get test cuts") cuts_test = self.test_cuts() logging.debug("About to create test dataset") test = K2SpeechRecognitionDataset( input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=23))) if self.args.on_the_fly_feats else PrecomputedFeatures(), return_cuts=self.args.return_cuts, ) sampler = DynamicBucketingSampler( cuts_test, max_duration=self.args.max_duration, shuffle=False, ) logging.debug("About to create test dataloader") test_dl = DataLoader( test, batch_size=None, sampler=sampler, num_workers=self.args.num_workers, ) return test_dl @lru_cache() def train_cuts(self) -> CutSet: logging.info("About to get train cuts") cuts_train = load_manifest_lazy( self.args.feature_dir / "yesno_cuts_train.jsonl.gz" ) return cuts_train @lru_cache() def test_cuts(self) -> List[CutSet]: logging.info("About to get test cuts") cuts_test = load_manifest_lazy( self.args.feature_dir / "yesno_cuts_test.jsonl.gz" ) return cuts_test