diff --git a/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py b/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py deleted file mode 100644 index b3bd823ff..000000000 --- a/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py +++ /dev/null @@ -1,362 +0,0 @@ -import argparse -import logging -from functools import lru_cache -from pathlib import Path -from typing import List, Union - -from lhotse import CutSet, Fbank, FbankConfig, load_manifest -from lhotse.dataset import ( - BucketingSampler, - CutConcatenate, - CutMix, - K2SpeechRecognitionDataset, - PrecomputedFeatures, - SingleCutSampler, - SpecAugment, -) -from lhotse.dataset.dataloading import LhotseDataLoader -from lhotse.dataset.input_strategies import OnTheFlyFeatures -from torch.utils.data import DataLoader - -from icefall.dataset.datamodule import DataModule -from icefall.utils import str2bool - - -class LibriSpeechAsrDataModule(DataModule): - """ - DataModule for K2 ASR experiments. - It assumes there is always one train and valid dataloader, - but there can be multiple test dataloaders (e.g. LibriSpeech test-clean - and test-other). - - It contains all the common data pipeline modules used in ASR - experiments, e.g.: - - dynamic batch size, - - bucketing samplers, - - cut concatenation, - - augmentation, - - on-the-fly feature extraction - - This class should be derived for specific corpora used in ASR tasks. - """ - - @classmethod - def add_arguments(cls, parser: argparse.ArgumentParser): - super().add_arguments(parser) - group = parser.add_argument_group( - title="ASR data related options", - description="These options are used for the preparation of " - "PyTorch DataLoaders from Lhotse CutSet's -- they control the " - "effective batch sizes, sampling strategies, applied data " - "augmentations, etc.", - ) - group.add_argument( - "--full-libri", - type=str2bool, - default=True, - help="When enabled, use 960h LibriSpeech. " - "Otherwise, use 100h subset.", - ) - group.add_argument( - "--feature-dir", - type=Path, - default=Path("data/fbank"), - help="Path to directory with train/valid/test cuts.", - ) - group.add_argument( - "--max-duration", - type=int, - default=500.0, - help="Maximum pooled recordings duration (seconds) in a " - "single batch. You can reduce it if it causes CUDA OOM.", - ) - group.add_argument( - "--bucketing-sampler", - type=str2bool, - default=False, - help="When enabled, the batches will come from buckets of " - "similar duration (saves padding frames).", - ) - group.add_argument( - "--num-buckets", - type=int, - default=30, - help="The number of buckets for the BucketingSampler" - "(you might want to increase it for larger datasets).", - ) - group.add_argument( - "--concatenate-cuts", - type=str2bool, - default=True, - help="When enabled, utterances (cuts) will be concatenated " - "to minimize the amount of padding.", - ) - group.add_argument( - "--duration-factor", - type=float, - default=1.0, - help="Determines the maximum duration of a concatenated cut " - "relative to the duration of the longest cut in a batch.", - ) - group.add_argument( - "--gap", - type=float, - default=1.0, - help="The amount of padding (in seconds) inserted between " - "concatenated cuts. This padding is filled with noise when " - "noise augmentation is used.", - ) - group.add_argument( - "--on-the-fly-feats", - type=str2bool, - default=False, - help="When enabled, use on-the-fly cut mixing and feature " - "extraction. Will drop existing precomputed feature manifests " - "if available.", - ) - group.add_argument( - "--shuffle", - type=str2bool, - default=True, - help="When enabled (=default), the examples will be " - "shuffled for each epoch.", - ) - group.add_argument( - "--return-cuts", - type=str2bool, - default=True, - help="When enabled, each batch will have the " - "field: batch['supervisions']['cut'] with the cuts that " - "were used to construct it.", - ) - - group.add_argument( - "--num-workers", - type=int, - default=2, - help="The number of training dataloader workers that " - "collect the batches.", - ) - - group.add_argument( - "--num-workers-inner", - type=int, - default=8, - help="The number of sub-workers (replicated for each of " - "training dataloader workers) that parallelize " - "the I/O to collect each batch.", - ) - - def train_dataloaders(self) -> DataLoader: - logging.info("About to get train cuts") - cuts_train = self.train_cuts() - - logging.info("About to get Musan cuts") - cuts_musan = load_manifest(self.args.feature_dir / "cuts_musan.json.gz") - - logging.info("About to create train dataset") - transforms = [CutMix(cuts=cuts_musan, prob=0.5, snr=(10, 20))] - if self.args.concatenate_cuts: - logging.info( - f"Using cut concatenation with duration factor " - f"{self.args.duration_factor} and gap {self.args.gap}." - ) - # Cut concatenation should be the first transform in the list, - # so that if we e.g. mix noise in, it will fill the gaps between - # different utterances. - transforms = [ - CutConcatenate( - duration_factor=self.args.duration_factor, gap=self.args.gap - ) - ] + transforms - - input_transforms = [ - SpecAugment( - num_frame_masks=2, - features_mask_size=27, - num_feature_masks=2, - frames_mask_size=100, - ) - ] - - train = K2SpeechRecognitionDataset( - cut_transforms=transforms, - input_transforms=input_transforms, - return_cuts=self.args.return_cuts, - ) - - if self.args.on_the_fly_feats: - # NOTE: the PerturbSpeed transform should be added only if we - # remove it from data prep stage. - # Add on-the-fly speed perturbation; since originally it would - # have increased epoch size by 3, we will apply prob 2/3 and use - # 3x more epochs. - # Speed perturbation probably should come first before - # concatenation, but in principle the transforms order doesn't have - # to be strict (e.g. could be randomized) - # transforms = [PerturbSpeed(factors=[0.9, 1.1], p=2/3)] + transforms # noqa - # Drop feats to be on the safe side. - train = K2SpeechRecognitionDataset( - cut_transforms=transforms, - input_strategy=OnTheFlyFeatures( - Fbank(FbankConfig(num_mel_bins=80)), - num_workers=self.args.num_workers_inner, - ), - input_transforms=input_transforms, - return_cuts=self.args.return_cuts, - ) - - if self.args.bucketing_sampler: - logging.info("Using BucketingSampler.") - train_sampler = BucketingSampler( - cuts_train, - max_duration=self.args.max_duration, - shuffle=self.args.shuffle, - num_buckets=self.args.num_buckets, - bucket_method="equal_duration", - drop_last=True, - ) - else: - logging.info("Using SingleCutSampler.") - train_sampler = SingleCutSampler( - cuts_train, - max_duration=self.args.max_duration, - shuffle=self.args.shuffle, - ) - logging.info("About to create train dataloader") - - # train_dl = DataLoader( - # train, - # sampler=train_sampler, - # batch_size=None, - # num_workers=2, - # persistent_workers=False, - # ) - - train_dl = LhotseDataLoader( - train, - sampler=train_sampler, - num_workers=self.args.num_workers, - prefetch_factor=5, - ) - - return train_dl - - def valid_dataloaders(self) -> DataLoader: - logging.info("About to get dev cuts") - cuts_valid = self.valid_cuts() - - transforms = [] - if self.args.concatenate_cuts: - transforms = [ - CutConcatenate( - duration_factor=self.args.duration_factor, gap=self.args.gap - ) - ] + transforms - - logging.info("About to create dev dataset") - if self.args.on_the_fly_feats: - validate = K2SpeechRecognitionDataset( - cut_transforms=transforms, - input_strategy=OnTheFlyFeatures( - Fbank(FbankConfig(num_mel_bins=80)) - ), - return_cuts=self.args.return_cuts, - ) - else: - validate = K2SpeechRecognitionDataset( - cut_transforms=transforms, - return_cuts=self.args.return_cuts, - ) - valid_sampler = SingleCutSampler( - cuts_valid, - max_duration=self.args.max_duration, - shuffle=False, - ) - logging.info("About to create dev dataloader") - # valid_dl = DataLoader( - # validate, - # sampler=valid_sampler, - # batch_size=None, - # num_workers=2, - # persistent_workers=False, - # ) - - valid_dl = LhotseDataLoader( - validate, - sampler=valid_sampler, - num_workers=2, - ) - - return valid_dl - - def test_dataloaders(self) -> Union[DataLoader, List[DataLoader]]: - cuts = self.test_cuts() - is_list = isinstance(cuts, list) - test_loaders = [] - if not is_list: - cuts = [cuts] - - for cuts_test in cuts: - logging.debug("About to create test dataset") - test = K2SpeechRecognitionDataset( - input_strategy=OnTheFlyFeatures( - Fbank(FbankConfig(num_mel_bins=80), num_workers=4) - if self.args.on_the_fly_feats - else PrecomputedFeatures() - ), - return_cuts=self.args.return_cuts, - ) - sampler = SingleCutSampler( - cuts_test, max_duration=self.args.max_duration - ) - logging.debug("About to create test dataloader") - # test_dl = DataLoader( - # test, batch_size=None, sampler=sampler, num_workers=1 - # ) - test_dl = LhotseDataLoader(test, sampler=sampler, num_workers=2) - test_loaders.append(test_dl) - - if is_list: - return test_loaders - else: - return test_loaders[0] - - @lru_cache() - def train_cuts(self) -> CutSet: - logging.info("About to get train cuts") - cuts_train = load_manifest( - self.args.feature_dir / "cuts_train-clean-100.json.gz" - ) - if self.args.full_libri: - cuts_train = ( - cuts_train - + load_manifest( - self.args.feature_dir / "cuts_train-clean-360.json.gz" - ) - + load_manifest( - self.args.feature_dir / "cuts_train-other-500.json.gz" - ) - ) - return cuts_train - - @lru_cache() - def valid_cuts(self) -> CutSet: - logging.info("About to get dev cuts") - cuts_valid = load_manifest( - self.args.feature_dir / "cuts_dev-clean.json.gz" - ) + load_manifest(self.args.feature_dir / "cuts_dev-other.json.gz") - return cuts_valid - - @lru_cache() - def test_cuts(self) -> List[CutSet]: - test_sets = ["test-clean", "test-other"] - cuts = [] - for test_set in test_sets: - logging.debug("About to get test cuts") - cuts.append( - load_manifest( - self.args.feature_dir / f"cuts_{test_set}.json.gz" - ) - ) - return cuts diff --git a/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py b/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py new file mode 120000 index 000000000..fa1b8cca3 --- /dev/null +++ b/egs/librispeech/ASR/conformer_ctc/asr_datamodule.py @@ -0,0 +1 @@ +../tdnn_lstm_ctc/asr_datamodule.py \ No newline at end of file diff --git a/egs/librispeech/ASR/conformer_ctc/decode.py b/egs/librispeech/ASR/conformer_ctc/decode.py index 77f35253e..c540b1ea1 100755 --- a/egs/librispeech/ASR/conformer_ctc/decode.py +++ b/egs/librispeech/ASR/conformer_ctc/decode.py @@ -321,7 +321,7 @@ def decode_dataset( try: num_batches = len(dl) except TypeError: - num_batches = None + num_batches = "?" results = defaultdict(list) for batch_idx, batch in enumerate(dl): @@ -350,10 +350,7 @@ def decode_dataset( num_cuts += len(batch["supervisions"]["text"]) if batch_idx % 100 == 0: - if num_batches is not None: - batch_str = f"{batch_idx}/{num_batches}" - else: - batch_str = f"{batch_idx}" + batch_str = f"{batch_idx}/{num_batches}" logging.info( f"batch {batch_str}, cuts processed until now is {num_cuts}" diff --git a/egs/librispeech/ASR/tdnn_lstm_ctc/asr_datamodule.py b/egs/librispeech/ASR/tdnn_lstm_ctc/asr_datamodule.py index b3bd823ff..748b9541c 100644 --- a/egs/librispeech/ASR/tdnn_lstm_ctc/asr_datamodule.py +++ b/egs/librispeech/ASR/tdnn_lstm_ctc/asr_datamodule.py @@ -14,7 +14,6 @@ from lhotse.dataset import ( SingleCutSampler, SpecAugment, ) -from lhotse.dataset.dataloading import LhotseDataLoader from lhotse.dataset.input_strategies import OnTheFlyFeatures from torch.utils.data import DataLoader @@ -87,7 +86,7 @@ class LibriSpeechAsrDataModule(DataModule): group.add_argument( "--concatenate-cuts", type=str2bool, - default=True, + default=False, help="When enabled, utterances (cuts) will be concatenated " "to minimize the amount of padding.", ) @@ -199,8 +198,7 @@ class LibriSpeechAsrDataModule(DataModule): train = K2SpeechRecognitionDataset( cut_transforms=transforms, input_strategy=OnTheFlyFeatures( - Fbank(FbankConfig(num_mel_bins=80)), - num_workers=self.args.num_workers_inner, + Fbank(FbankConfig(num_mel_bins=80)) ), input_transforms=input_transforms, return_cuts=self.args.return_cuts, @@ -225,19 +223,12 @@ class LibriSpeechAsrDataModule(DataModule): ) logging.info("About to create train dataloader") - # train_dl = DataLoader( - # train, - # sampler=train_sampler, - # batch_size=None, - # num_workers=2, - # persistent_workers=False, - # ) - - train_dl = LhotseDataLoader( + train_dl = DataLoader( train, sampler=train_sampler, - num_workers=self.args.num_workers, - prefetch_factor=5, + batch_size=None, + num_workers=2, + persistent_workers=False, ) return train_dl @@ -274,18 +265,12 @@ class LibriSpeechAsrDataModule(DataModule): shuffle=False, ) logging.info("About to create dev dataloader") - # valid_dl = DataLoader( - # validate, - # sampler=valid_sampler, - # batch_size=None, - # num_workers=2, - # persistent_workers=False, - # ) - - valid_dl = LhotseDataLoader( + valid_dl = DataLoader( validate, sampler=valid_sampler, + batch_size=None, num_workers=2, + persistent_workers=False, ) return valid_dl @@ -301,20 +286,19 @@ class LibriSpeechAsrDataModule(DataModule): logging.debug("About to create test dataset") test = K2SpeechRecognitionDataset( input_strategy=OnTheFlyFeatures( - Fbank(FbankConfig(num_mel_bins=80), num_workers=4) - if self.args.on_the_fly_feats - else PrecomputedFeatures() - ), + Fbank(FbankConfig(num_mel_bins=80)) + ) + if self.args.on_the_fly_feats + else PrecomputedFeatures(), return_cuts=self.args.return_cuts, ) sampler = SingleCutSampler( cuts_test, max_duration=self.args.max_duration ) logging.debug("About to create test dataloader") - # test_dl = DataLoader( - # test, batch_size=None, sampler=sampler, num_workers=1 - # ) - test_dl = LhotseDataLoader(test, sampler=sampler, num_workers=2) + test_dl = DataLoader( + test, batch_size=None, sampler=sampler, num_workers=1 + ) test_loaders.append(test_dl) if is_list: diff --git a/egs/librispeech/ASR/tdnn_lstm_ctc/decode.py b/egs/librispeech/ASR/tdnn_lstm_ctc/decode.py index 2aca804fa..72f39ef40 100755 --- a/egs/librispeech/ASR/tdnn_lstm_ctc/decode.py +++ b/egs/librispeech/ASR/tdnn_lstm_ctc/decode.py @@ -240,7 +240,7 @@ def decode_dataset( try: num_batches = len(dl) except TypeError: - num_batches = None + num_batches = "?" results = defaultdict(list) for batch_idx, batch in enumerate(dl): @@ -267,10 +267,7 @@ def decode_dataset( num_cuts += len(batch["supervisions"]["text"]) if batch_idx % 100 == 0: - if num_batches is not None: - batch_str = f"{batch_idx}/{num_batches}" - else: - batch_str = f"{batch_idx}" + batch_str = f"{batch_idx}/{num_batches}" logging.info( f"batch {batch_str}, cuts processed until now is {num_cuts}" diff --git a/icefall/checkpoint.py b/icefall/checkpoint.py index e45df4fe4..a64ecfcf6 100644 --- a/icefall/checkpoint.py +++ b/icefall/checkpoint.py @@ -91,7 +91,7 @@ def load_checkpoint( checkpoint.pop("model") def load(name, obj): - s = checkpoint[name] + s = checkpoint.get(name, None) if obj and s: obj.load_state_dict(s) checkpoint.pop(name)