From dbe270ba9411ad904eef34683378f69a6c0463f6 Mon Sep 17 00:00:00 2001 From: Kinan Martin Date: Wed, 30 Apr 2025 10:06:13 +0900 Subject: [PATCH] adjusted prepare.sh to only calculate fbank and manifest together; adjust datamodule to load from manifest files --- .../ASR/local/compute_fbank_mls_english.py | 3 +- .../ASR/local/utils/asr_datamodule.py | 360 +++++++++++------- egs/mls_english/ASR/prepare.sh | 57 ++- 3 files changed, 261 insertions(+), 159 deletions(-) mode change 100644 => 100755 egs/mls_english/ASR/prepare.sh diff --git a/egs/mls_english/ASR/local/compute_fbank_mls_english.py b/egs/mls_english/ASR/local/compute_fbank_mls_english.py index 532c7f04e..8c5cae842 100644 --- a/egs/mls_english/ASR/local/compute_fbank_mls_english.py +++ b/egs/mls_english/ASR/local/compute_fbank_mls_english.py @@ -60,7 +60,8 @@ def make_cutset_blueprints( from datasets import load_dataset - dataset = load_dataset(mls_eng_hf_dataset_path) + print(f"{mls_eng_hf_dataset_path=}") + dataset = load_dataset(str(mls_eng_hf_dataset_path)) # Create test dataset logging.info("Creating test cuts.") diff --git a/egs/mls_english/ASR/local/utils/asr_datamodule.py b/egs/mls_english/ASR/local/utils/asr_datamodule.py index 23e50fe02..250b40a63 100644 --- a/egs/mls_english/ASR/local/utils/asr_datamodule.py +++ b/egs/mls_english/ASR/local/utils/asr_datamodule.py @@ -21,13 +21,15 @@ import inspect import logging from functools import lru_cache from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional -from lhotse import CutSet, Fbank, FbankConfig +from lhotse import CutSet, Fbank, FbankConfig, load_manifest, load_manifest_lazy from lhotse.dataset import ( CutConcatenate, + CutMix, DynamicBucketingSampler, K2SpeechRecognitionDataset, + PrecomputedFeatures, SimpleCutSampler, SpecAugment, ) @@ -39,215 +41,315 @@ from icefall.utils import str2bool class MLSEnglishHFAsrDataModule: """ - DataModule for MLS English ASR experiments using HuggingFace dataset. - Handles dataset loading and provides train/valid/test dataloaders with - on-the-fly feature extraction. + DataModule for k2 ASR experiments. + It assumes there is always one train and valid dataloader, + but there can be multiple test dataloaders (e.g. LibriSpeech test-clean + and test-other). + It contains all the common data pipeline modules used in ASR + experiments, e.g.: + - dynamic batch size, + - bucketing samplers, + - cut concatenation, + - augmentation, + - on-the-fly feature extraction + This class should be derived for specific corpora used in ASR tasks. """ def __init__(self, args: argparse.Namespace): self.args = args - self.dataset = None - - # self._validate_args() - - # def _validate_args(self) -> None: - # """Validate configuration arguments.""" - # if self.args.on_the_fly_feats is False: - # raise ValueError("This recipe requires on-the-fly feature extraction") @classmethod - def add_arguments(cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser: + def add_arguments(cls, parser: argparse.ArgumentParser): group = parser.add_argument_group( title="ASR data related options", - description="Options for data loading and processing", + description="These options are used for the preparation of " + "PyTorch DataLoaders from Lhotse CutSet's -- they control the " + "effective batch sizes, sampling strategies, applied data " + "augmentations, etc.", ) - - # Dataset configuration group.add_argument( - "--dataset-path", - type=str, - default="parler-tts/mls_eng", - help="Path to HuggingFace MLS English dataset (name or local path)", + "--manifest-dir", + type=Path, + default=Path("data/manifests"), + help="Path to directory with train/dev/test cuts.", ) - - # Sampling and batching group.add_argument( "--max-duration", - type=float, + type=int, default=200.0, - help="Maximum batch duration in seconds", + help="Maximum pooled recordings duration (seconds) in a " + "single batch. You can reduce it if it causes CUDA OOM.", ) group.add_argument( "--bucketing-sampler", type=str2bool, default=True, - help="Whether to use bucketing sampler", + help="When enabled, the batches will come from buckets of " + "similar duration (saves padding frames).", ) group.add_argument( "--num-buckets", type=int, default=30, - help="Number of buckets for DynamicBucketingSampler", + help="The number of buckets for the DynamicBucketingSampler" + "(you might want to increase it for larger datasets).", ) - - # Data augmentation group.add_argument( - "--enable-spec-aug", + "--concatenate-cuts", + type=str2bool, + default=False, + help="When enabled, utterances (cuts) will be concatenated " + "to minimize the amount of padding.", + ) + group.add_argument( + "--duration-factor", + type=float, + default=1.0, + help="Determines the maximum duration of a concatenated cut " + "relative to the duration of the longest cut in a batch.", + ) + group.add_argument( + "--gap", + type=float, + default=1.0, + help="The amount of padding (in seconds) inserted between " + "concatenated cuts. This padding is filled with noise when " + "noise augmentation is used.", + ) + group.add_argument( + "--on-the-fly-feats", + type=str2bool, + default=False, + help="When enabled, use on-the-fly cut mixing and feature " + "extraction. Will drop existing precomputed feature manifests " + "if available.", + ) + group.add_argument( + "--shuffle", type=str2bool, default=True, - help="Whether to enable SpecAugment", + help="When enabled (=default), the examples will be " + "shuffled for each epoch.", ) group.add_argument( - "--spec-aug-time-warp-factor", - type=int, - default=80, - help="Time warp factor for SpecAugment", - ) - - # Dataloader configuration - group.add_argument( - "--num-workers", - type=int, - default=2, - help="Number of workers for data loading", + "--drop-last", + type=str2bool, + default=True, + help="Whether to drop last batch. Used by sampler.", ) group.add_argument( "--return-cuts", type=str2bool, default=False, - help="Whether to return cuts in batch", + help="When enabled, each batch will have the " + "field: batch['supervisions']['cut'] with the cuts that " + "were used to construct it.", ) group.add_argument( - "--drop-last", - type=str2bool, - default=True, - help="Whether to drop last incomplete batch", + "--num-workers", + type=int, + default=2, + help="The number of training dataloader workers that " + "collect the batches.", ) - return parser + group.add_argument( + "--enable-spec-aug", + type=str2bool, + default=True, + help="When enabled, use SpecAugment for training dataset.", + ) - def load_dataset(self, dataset_path: Optional[str] = None) -> None: - """Load the HuggingFace dataset.""" - dataset_path = dataset_path or self.args.dataset_path - logging.info(f"Loading MLS English dataset from: {dataset_path}") + group.add_argument( + "--spec-aug-time-warp-factor", + type=int, + default=80, + help="Used only when --enable-spec-aug is True. " + "It specifies the factor for time warping in SpecAugment. " + "Larger values mean more warping. " + "A value less than 1 means to disable time warp.", + ) - try: - from datasets import load_dataset + group.add_argument( + "--enable-musan", + type=str2bool, + default=False, + help="When enabled, select noise from MUSAN and mix it" + "with training dataset. ", + ) - self.dataset = load_dataset(dataset_path) - logging.info("Dataset loaded successfully") - except ImportError: - raise ImportError("Please install datasets package: pip install datasets") - except Exception as e: - raise RuntimeError(f"Failed to load dataset: {e}") + def train_dataloaders( + self, cuts_train: CutSet, sampler_state_dict: Optional[Dict[str, Any]] = None + ) -> DataLoader: + """ + Args: + cuts_train: + CutSet for training. + sampler_state_dict: + The state dict for the training sampler. + """ - def _create_dataset( - self, cuts: CutSet, is_train: bool = False - ) -> K2SpeechRecognitionDataset: - """Create appropriate dataset with transforms.""" transforms = [] input_transforms = [] - if is_train and self.args.enable_spec_aug: - input_transforms.append(self._create_spec_augment()) + if self.args.enable_spec_aug: + logging.info("Enable SpecAugment") + logging.info(f"Time warp factor: {self.args.spec_aug_time_warp_factor}") + # Set the value of num_frame_masks according to Lhotse's version. + # In different Lhotse's versions, the default of num_frame_masks is + # different. + num_frame_masks = 10 + num_frame_masks_parameter = inspect.signature( + SpecAugment.__init__ + ).parameters["num_frame_masks"] + if num_frame_masks_parameter.default == 1: + num_frame_masks = 2 + logging.info(f"Num frame mask: {num_frame_masks}") + input_transforms.append( + SpecAugment( + time_warp_factor=self.args.spec_aug_time_warp_factor, + num_frame_masks=num_frame_masks, + features_mask_size=27, + num_feature_masks=2, + frames_mask_size=100, + ) + ) + else: + logging.info("Disable SpecAugment") - return K2SpeechRecognitionDataset( + logging.info("About to create train dataset") + train = K2SpeechRecognitionDataset( cut_transforms=transforms, - input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), input_transforms=input_transforms, return_cuts=self.args.return_cuts, ) - def _create_spec_augment(self) -> SpecAugment: - """Create SpecAugment transform based on config.""" - num_frame_masks = 10 - num_frame_masks_parameter = inspect.signature(SpecAugment.__init__).parameters[ - "num_frame_masks" - ] - if num_frame_masks_parameter.default == 1: - num_frame_masks = 2 + if self.args.on_the_fly_feats: + # NOTE: the PerturbSpeed transform should be added only if we + # remove it from data prep stage. + # Add on-the-fly speed perturbation; since originally it would + # have increased epoch size by 3, we will apply prob 2/3 and use + # 3x more epochs. + # Speed perturbation probably should come first before + # concatenation, but in principle the transforms order doesn't have + # to be strict (e.g. could be randomized) + # transforms = [PerturbSpeed(factors=[0.9, 1.1], p=2/3)] + transforms # noqa + # Drop feats to be on the safe side. + train = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + input_transforms=input_transforms, + return_cuts=self.args.return_cuts, + ) - return SpecAugment( - time_warp_factor=self.args.spec_aug_time_warp_factor, - num_frame_masks=num_frame_masks, - features_mask_size=27, - num_feature_masks=2, - frames_mask_size=100, - ) - - def _create_sampler( - self, cuts: CutSet, shuffle: bool - ) -> Union[DynamicBucketingSampler, SimpleCutSampler]: - """Create appropriate sampler based on config.""" if self.args.bucketing_sampler: - return DynamicBucketingSampler( - cuts, + logging.info("Using DynamicBucketingSampler.") + train_sampler = DynamicBucketingSampler( + cuts_train, max_duration=self.args.max_duration, - shuffle=shuffle, + shuffle=self.args.shuffle, num_buckets=self.args.num_buckets, drop_last=self.args.drop_last, ) - return SimpleCutSampler( - cuts, - max_duration=self.args.max_duration, - shuffle=shuffle, - ) + else: + logging.info("Using SimpleCutSampler.") + train_sampler = SimpleCutSampler( + cuts_train, + max_duration=self.args.max_duration, + shuffle=self.args.shuffle, + ) + logging.info("About to create train dataloader") - def train_dataloader( - self, sampler_state_dict: Optional[Dict[str, Any]] = None - ) -> DataLoader: - """Create train dataloader.""" - cuts = self.train_cuts() - dataset = self._create_dataset(cuts, is_train=True) - sampler = self._create_sampler(cuts, shuffle=True) + if sampler_state_dict is not None: + logging.info("Loading sampler state dict") + train_sampler.load_state_dict(sampler_state_dict) - if sampler_state_dict: - sampler.load_state_dict(sampler_state_dict) - - return DataLoader( - dataset, - sampler=sampler, + train_dl = DataLoader( + train, + sampler=train_sampler, batch_size=None, num_workers=self.args.num_workers, persistent_workers=False, ) - def valid_dataloader(self) -> DataLoader: - """Create validation dataloader.""" - cuts = self.valid_cuts() - return DataLoader( - self._create_dataset(cuts), - sampler=self._create_sampler(cuts, shuffle=False), + return train_dl + + def valid_dataloaders(self, cuts_valid: CutSet) -> DataLoader: + transforms = [] + if self.args.concatenate_cuts: + transforms = [ + CutConcatenate( + duration_factor=self.args.duration_factor, gap=self.args.gap + ) + ] + transforms + + logging.info("About to create dev dataset") + if self.args.on_the_fly_feats: + validate = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + return_cuts=self.args.return_cuts, + ) + else: + validate = K2SpeechRecognitionDataset( + cut_transforms=transforms, + return_cuts=self.args.return_cuts, + ) + valid_sampler = DynamicBucketingSampler( + cuts_valid, + max_duration=self.args.max_duration, + shuffle=False, + ) + logging.info("About to create dev dataloader") + valid_dl = DataLoader( + validate, + sampler=valid_sampler, batch_size=None, num_workers=2, persistent_workers=False, ) - def test_dataloader(self) -> DataLoader: - """Create test dataloader.""" - cuts = self.test_cuts() - return DataLoader( - self._create_dataset(cuts), - sampler=self._create_sampler(cuts, shuffle=False), + return valid_dl + + def test_dataloaders(self, cuts: CutSet) -> DataLoader: + logging.info("About to create test dataset") + test = K2SpeechRecognitionDataset( + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))) + if self.args.on_the_fly_feats + else PrecomputedFeatures(), + return_cuts=self.args.return_cuts, + ) + sampler = DynamicBucketingSampler( + cuts, + max_duration=self.args.max_duration, + shuffle=False, + ) + test_dl = DataLoader( + test, batch_size=None, + sampler=sampler, num_workers=self.args.num_workers, ) + return test_dl @lru_cache() def train_cuts(self) -> CutSet: - return CutSet.from_huggingface_dataset( - self.dataset["train"], text_key="transcript" + logging.info("About to get train cuts") + return load_manifest_lazy( + self.args.manifest_dir / "mls_english_cuts_train.jsonl.gz" ) @lru_cache() def valid_cuts(self) -> CutSet: - return CutSet.from_huggingface_dataset( - self.dataset["dev"], text_key="transcript" + logging.info("About to get dev cuts") + return load_manifest_lazy( + self.args.manifest_dir / "mls_english_cuts_dev.jsonl.gz" ) @lru_cache() - def test_cuts(self) -> CutSet: - return CutSet.from_huggingface_dataset( - self.dataset["test"], text_key="transcript" + def test_cuts(self) -> List[CutSet]: + logging.info("About to get test cuts") + return load_manifest_lazy( + self.args.manifest_dir / "mls_english_cuts_test.jsonl.gz" ) diff --git a/egs/mls_english/ASR/prepare.sh b/egs/mls_english/ASR/prepare.sh old mode 100644 new mode 100755 index 72bb3a5ba..ad1c10080 --- a/egs/mls_english/ASR/prepare.sh +++ b/egs/mls_english/ASR/prepare.sh @@ -1,15 +1,12 @@ #!/usr/bin/env bash # Prepare script for MLS English ASR recipe in icefall -# This recipe uses on-the-fly feature extraction, so it skips manifest -# and feature generation steps used in other recipes. # fix segmentation fault reported in https://github.com/k2-fsa/icefall/issues/674 export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python set -eou pipefail -nj=15 stage=-1 stop_stage=100 @@ -23,6 +20,9 @@ dl_dir=$PWD/download # All files generated by this script are saved in "data". mkdir -p data +mkdir -p data/audio # Add this line +mkdir -p data/manifests +mkdir -p data/lang log() { local fname=${BASH_SOURCE[1]##*/} @@ -41,21 +41,25 @@ if [ $stage -le 0 ] && [ $stop_stage -ge 0 ]; then fi fi -if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then - log "Stage 1: Prepare MLS English manifests and compute fbank" - # We assume that you have downloaded the MLS English corpus - # to $dl_dir/mls_english - mkdir -p data/manifests - if [ ! -e data/mls_english.done ]; then - # lhotse prepare mls_english -j $nj $dl_dir/mls_english data/manifests - python local/compute_fbank_mls_english.py --manifest-dir data/manifests --audio-dir data/audio --dl-dir $dl_dir/mls_english - touch data/manifests/.mls_english.done - fi -fi +# if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then +# log "Stage 1: Prepare MLS English manifest" +# # We assume that you have downloaded the MLS English corpus +# # to $dl_dir/mls_english +# if [ ! -e data/manifests/.mls_english.done ]; then +# # lhotse prepare mls_english -j $nj $dl_dir/mls_english data/manifests +# python local/utils/save_audios.py --num-jobs 8 --dataset-dir $dl_dir/mls_english --audio-dir ./data/audio --manifest-dir ./data/manifests +# touch data/manifests/.mls_english.done +# fi +# fi -if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then - log "Stage 2: Validate MLS English manifests" +if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then + log "Stage 1: Compute MLS English fbank" if [ ! -e data/manifests/.mls_english-validated.done ]; then + python local/compute_fbank_mls_english.py \ + --manifest-dir data/manifests \ + --audio-dir data/audio \ + --dl-dir $dl_dir/mls_english + # --dl-dir /root/datasets/parler-tts--mls_eng python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_train.jsonl.gz python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_dev.jsonl.gz python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_test.jsonl.gz @@ -63,21 +67,16 @@ if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then fi fi - -mkdir -p data/lang -lang_dir=data/lang - -if [ $stage -le 3 ] && [ $stop_stage -ge 3 ]; then - log "Stage 3: Prepare transcript for BPE training" - if [ ! -f $lang_dir/transcript.txt ]; then +if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then + log "Stage 2: Prepare transcript for BPE training" + if [ ! -f data/lang/transcript.txt ]; then log "Generating transcripts for BPE training" - ./local/utils/generate_transcript.py --lang-dir $lang_dir + ./local/utils/generate_transcript.py --lang-dir data/lang fi fi -if [ $stage -le 4 ] && [ $stop_stage -ge 4 ]; then - log "Stage 4: Prepare BPE tokenizer" - +if [ $stage -le 3 ] && [ $stop_stage -ge 3 ]; then + log "Stage 3: Prepare BPE tokenizer" for vocab_size in ${vocab_sizes[@]}; do log "Training BPE model with vocab_size=${vocab_size}" bpe_dir=data/lang/bpe_${vocab_size} @@ -87,9 +86,9 @@ if [ $stage -le 4 ] && [ $stop_stage -ge 4 ]; then ./local/train_bpe_model.py \ --lang-dir $bpe_dir \ --vocab-size $vocab_size \ - --transcript $lang_dir/transcript.txt + --transcript data/lang/transcript.txt fi done fi -log "MLS English data preparation completed successfully" +log "MLS English data preparation completed successfully" \ No newline at end of file