adjusted prepare.sh to only calculate fbank and manifest together; adjust datamodule to load from manifest files

This commit is contained in:
Kinan Martin 2025-04-30 10:06:13 +09:00
parent cf425173af
commit dbe270ba94
3 changed files with 261 additions and 159 deletions

View File

@ -60,7 +60,8 @@ def make_cutset_blueprints(
from datasets import load_dataset from datasets import load_dataset
dataset = load_dataset(mls_eng_hf_dataset_path) print(f"{mls_eng_hf_dataset_path=}")
dataset = load_dataset(str(mls_eng_hf_dataset_path))
# Create test dataset # Create test dataset
logging.info("Creating test cuts.") logging.info("Creating test cuts.")

View File

@ -21,13 +21,15 @@ import inspect
import logging import logging
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional
from lhotse import CutSet, Fbank, FbankConfig from lhotse import CutSet, Fbank, FbankConfig, load_manifest, load_manifest_lazy
from lhotse.dataset import ( from lhotse.dataset import (
CutConcatenate, CutConcatenate,
CutMix,
DynamicBucketingSampler, DynamicBucketingSampler,
K2SpeechRecognitionDataset, K2SpeechRecognitionDataset,
PrecomputedFeatures,
SimpleCutSampler, SimpleCutSampler,
SpecAugment, SpecAugment,
) )
@ -39,215 +41,315 @@ from icefall.utils import str2bool
class MLSEnglishHFAsrDataModule: class MLSEnglishHFAsrDataModule:
""" """
DataModule for MLS English ASR experiments using HuggingFace dataset. DataModule for k2 ASR experiments.
Handles dataset loading and provides train/valid/test dataloaders with It assumes there is always one train and valid dataloader,
on-the-fly feature extraction. but there can be multiple test dataloaders (e.g. LibriSpeech test-clean
and test-other).
It contains all the common data pipeline modules used in ASR
experiments, e.g.:
- dynamic batch size,
- bucketing samplers,
- cut concatenation,
- augmentation,
- on-the-fly feature extraction
This class should be derived for specific corpora used in ASR tasks.
""" """
def __init__(self, args: argparse.Namespace): def __init__(self, args: argparse.Namespace):
self.args = args self.args = args
self.dataset = None
# self._validate_args()
# def _validate_args(self) -> None:
# """Validate configuration arguments."""
# if self.args.on_the_fly_feats is False:
# raise ValueError("This recipe requires on-the-fly feature extraction")
@classmethod @classmethod
def add_arguments(cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser: def add_arguments(cls, parser: argparse.ArgumentParser):
group = parser.add_argument_group( group = parser.add_argument_group(
title="ASR data related options", title="ASR data related options",
description="Options for data loading and processing", description="These options are used for the preparation of "
"PyTorch DataLoaders from Lhotse CutSet's -- they control the "
"effective batch sizes, sampling strategies, applied data "
"augmentations, etc.",
) )
# Dataset configuration
group.add_argument( group.add_argument(
"--dataset-path", "--manifest-dir",
type=str, type=Path,
default="parler-tts/mls_eng", default=Path("data/manifests"),
help="Path to HuggingFace MLS English dataset (name or local path)", help="Path to directory with train/dev/test cuts.",
) )
# Sampling and batching
group.add_argument( group.add_argument(
"--max-duration", "--max-duration",
type=float, type=int,
default=200.0, default=200.0,
help="Maximum batch duration in seconds", help="Maximum pooled recordings duration (seconds) in a "
"single batch. You can reduce it if it causes CUDA OOM.",
) )
group.add_argument( group.add_argument(
"--bucketing-sampler", "--bucketing-sampler",
type=str2bool, type=str2bool,
default=True, default=True,
help="Whether to use bucketing sampler", help="When enabled, the batches will come from buckets of "
"similar duration (saves padding frames).",
) )
group.add_argument( group.add_argument(
"--num-buckets", "--num-buckets",
type=int, type=int,
default=30, default=30,
help="Number of buckets for DynamicBucketingSampler", help="The number of buckets for the DynamicBucketingSampler"
"(you might want to increase it for larger datasets).",
) )
# Data augmentation
group.add_argument( group.add_argument(
"--enable-spec-aug", "--concatenate-cuts",
type=str2bool,
default=False,
help="When enabled, utterances (cuts) will be concatenated "
"to minimize the amount of padding.",
)
group.add_argument(
"--duration-factor",
type=float,
default=1.0,
help="Determines the maximum duration of a concatenated cut "
"relative to the duration of the longest cut in a batch.",
)
group.add_argument(
"--gap",
type=float,
default=1.0,
help="The amount of padding (in seconds) inserted between "
"concatenated cuts. This padding is filled with noise when "
"noise augmentation is used.",
)
group.add_argument(
"--on-the-fly-feats",
type=str2bool,
default=False,
help="When enabled, use on-the-fly cut mixing and feature "
"extraction. Will drop existing precomputed feature manifests "
"if available.",
)
group.add_argument(
"--shuffle",
type=str2bool, type=str2bool,
default=True, default=True,
help="Whether to enable SpecAugment", help="When enabled (=default), the examples will be "
"shuffled for each epoch.",
) )
group.add_argument( group.add_argument(
"--spec-aug-time-warp-factor", "--drop-last",
type=int, type=str2bool,
default=80, default=True,
help="Time warp factor for SpecAugment", help="Whether to drop last batch. Used by sampler.",
)
# Dataloader configuration
group.add_argument(
"--num-workers",
type=int,
default=2,
help="Number of workers for data loading",
) )
group.add_argument( group.add_argument(
"--return-cuts", "--return-cuts",
type=str2bool, type=str2bool,
default=False, default=False,
help="Whether to return cuts in batch", help="When enabled, each batch will have the "
"field: batch['supervisions']['cut'] with the cuts that "
"were used to construct it.",
) )
group.add_argument( group.add_argument(
"--drop-last", "--num-workers",
type=str2bool, type=int,
default=True, default=2,
help="Whether to drop last incomplete batch", help="The number of training dataloader workers that "
"collect the batches.",
) )
return parser group.add_argument(
"--enable-spec-aug",
type=str2bool,
default=True,
help="When enabled, use SpecAugment for training dataset.",
)
def load_dataset(self, dataset_path: Optional[str] = None) -> None: group.add_argument(
"""Load the HuggingFace dataset.""" "--spec-aug-time-warp-factor",
dataset_path = dataset_path or self.args.dataset_path type=int,
logging.info(f"Loading MLS English dataset from: {dataset_path}") default=80,
help="Used only when --enable-spec-aug is True. "
"It specifies the factor for time warping in SpecAugment. "
"Larger values mean more warping. "
"A value less than 1 means to disable time warp.",
)
try: group.add_argument(
from datasets import load_dataset "--enable-musan",
type=str2bool,
default=False,
help="When enabled, select noise from MUSAN and mix it"
"with training dataset. ",
)
self.dataset = load_dataset(dataset_path) def train_dataloaders(
logging.info("Dataset loaded successfully") self, cuts_train: CutSet, sampler_state_dict: Optional[Dict[str, Any]] = None
except ImportError: ) -> DataLoader:
raise ImportError("Please install datasets package: pip install datasets") """
except Exception as e: Args:
raise RuntimeError(f"Failed to load dataset: {e}") cuts_train:
CutSet for training.
sampler_state_dict:
The state dict for the training sampler.
"""
def _create_dataset(
self, cuts: CutSet, is_train: bool = False
) -> K2SpeechRecognitionDataset:
"""Create appropriate dataset with transforms."""
transforms = [] transforms = []
input_transforms = [] input_transforms = []
if is_train and self.args.enable_spec_aug: if self.args.enable_spec_aug:
input_transforms.append(self._create_spec_augment()) logging.info("Enable SpecAugment")
logging.info(f"Time warp factor: {self.args.spec_aug_time_warp_factor}")
return K2SpeechRecognitionDataset( # Set the value of num_frame_masks according to Lhotse's version.
cut_transforms=transforms, # In different Lhotse's versions, the default of num_frame_masks is
input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), # different.
input_transforms=input_transforms,
return_cuts=self.args.return_cuts,
)
def _create_spec_augment(self) -> SpecAugment:
"""Create SpecAugment transform based on config."""
num_frame_masks = 10 num_frame_masks = 10
num_frame_masks_parameter = inspect.signature(SpecAugment.__init__).parameters[ num_frame_masks_parameter = inspect.signature(
"num_frame_masks" SpecAugment.__init__
] ).parameters["num_frame_masks"]
if num_frame_masks_parameter.default == 1: if num_frame_masks_parameter.default == 1:
num_frame_masks = 2 num_frame_masks = 2
logging.info(f"Num frame mask: {num_frame_masks}")
return SpecAugment( input_transforms.append(
SpecAugment(
time_warp_factor=self.args.spec_aug_time_warp_factor, time_warp_factor=self.args.spec_aug_time_warp_factor,
num_frame_masks=num_frame_masks, num_frame_masks=num_frame_masks,
features_mask_size=27, features_mask_size=27,
num_feature_masks=2, num_feature_masks=2,
frames_mask_size=100, frames_mask_size=100,
) )
)
else:
logging.info("Disable SpecAugment")
logging.info("About to create train dataset")
train = K2SpeechRecognitionDataset(
cut_transforms=transforms,
input_transforms=input_transforms,
return_cuts=self.args.return_cuts,
)
if self.args.on_the_fly_feats:
# NOTE: the PerturbSpeed transform should be added only if we
# remove it from data prep stage.
# Add on-the-fly speed perturbation; since originally it would
# have increased epoch size by 3, we will apply prob 2/3 and use
# 3x more epochs.
# Speed perturbation probably should come first before
# concatenation, but in principle the transforms order doesn't have
# to be strict (e.g. could be randomized)
# transforms = [PerturbSpeed(factors=[0.9, 1.1], p=2/3)] + transforms # noqa
# Drop feats to be on the safe side.
train = K2SpeechRecognitionDataset(
cut_transforms=transforms,
input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))),
input_transforms=input_transforms,
return_cuts=self.args.return_cuts,
)
def _create_sampler(
self, cuts: CutSet, shuffle: bool
) -> Union[DynamicBucketingSampler, SimpleCutSampler]:
"""Create appropriate sampler based on config."""
if self.args.bucketing_sampler: if self.args.bucketing_sampler:
return DynamicBucketingSampler( logging.info("Using DynamicBucketingSampler.")
cuts, train_sampler = DynamicBucketingSampler(
cuts_train,
max_duration=self.args.max_duration, max_duration=self.args.max_duration,
shuffle=shuffle, shuffle=self.args.shuffle,
num_buckets=self.args.num_buckets, num_buckets=self.args.num_buckets,
drop_last=self.args.drop_last, drop_last=self.args.drop_last,
) )
return SimpleCutSampler( else:
cuts, logging.info("Using SimpleCutSampler.")
train_sampler = SimpleCutSampler(
cuts_train,
max_duration=self.args.max_duration, max_duration=self.args.max_duration,
shuffle=shuffle, shuffle=self.args.shuffle,
) )
logging.info("About to create train dataloader")
def train_dataloader( if sampler_state_dict is not None:
self, sampler_state_dict: Optional[Dict[str, Any]] = None logging.info("Loading sampler state dict")
) -> DataLoader: train_sampler.load_state_dict(sampler_state_dict)
"""Create train dataloader."""
cuts = self.train_cuts()
dataset = self._create_dataset(cuts, is_train=True)
sampler = self._create_sampler(cuts, shuffle=True)
if sampler_state_dict: train_dl = DataLoader(
sampler.load_state_dict(sampler_state_dict) train,
sampler=train_sampler,
return DataLoader(
dataset,
sampler=sampler,
batch_size=None, batch_size=None,
num_workers=self.args.num_workers, num_workers=self.args.num_workers,
persistent_workers=False, persistent_workers=False,
) )
def valid_dataloader(self) -> DataLoader: return train_dl
"""Create validation dataloader."""
cuts = self.valid_cuts() def valid_dataloaders(self, cuts_valid: CutSet) -> DataLoader:
return DataLoader( transforms = []
self._create_dataset(cuts), if self.args.concatenate_cuts:
sampler=self._create_sampler(cuts, shuffle=False), transforms = [
CutConcatenate(
duration_factor=self.args.duration_factor, gap=self.args.gap
)
] + transforms
logging.info("About to create dev dataset")
if self.args.on_the_fly_feats:
validate = K2SpeechRecognitionDataset(
cut_transforms=transforms,
input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))),
return_cuts=self.args.return_cuts,
)
else:
validate = K2SpeechRecognitionDataset(
cut_transforms=transforms,
return_cuts=self.args.return_cuts,
)
valid_sampler = DynamicBucketingSampler(
cuts_valid,
max_duration=self.args.max_duration,
shuffle=False,
)
logging.info("About to create dev dataloader")
valid_dl = DataLoader(
validate,
sampler=valid_sampler,
batch_size=None, batch_size=None,
num_workers=2, num_workers=2,
persistent_workers=False, persistent_workers=False,
) )
def test_dataloader(self) -> DataLoader: return valid_dl
"""Create test dataloader."""
cuts = self.test_cuts() def test_dataloaders(self, cuts: CutSet) -> DataLoader:
return DataLoader( logging.info("About to create test dataset")
self._create_dataset(cuts), test = K2SpeechRecognitionDataset(
sampler=self._create_sampler(cuts, shuffle=False), input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80)))
if self.args.on_the_fly_feats
else PrecomputedFeatures(),
return_cuts=self.args.return_cuts,
)
sampler = DynamicBucketingSampler(
cuts,
max_duration=self.args.max_duration,
shuffle=False,
)
test_dl = DataLoader(
test,
batch_size=None, batch_size=None,
sampler=sampler,
num_workers=self.args.num_workers, num_workers=self.args.num_workers,
) )
return test_dl
@lru_cache() @lru_cache()
def train_cuts(self) -> CutSet: def train_cuts(self) -> CutSet:
return CutSet.from_huggingface_dataset( logging.info("About to get train cuts")
self.dataset["train"], text_key="transcript" return load_manifest_lazy(
self.args.manifest_dir / "mls_english_cuts_train.jsonl.gz"
) )
@lru_cache() @lru_cache()
def valid_cuts(self) -> CutSet: def valid_cuts(self) -> CutSet:
return CutSet.from_huggingface_dataset( logging.info("About to get dev cuts")
self.dataset["dev"], text_key="transcript" return load_manifest_lazy(
self.args.manifest_dir / "mls_english_cuts_dev.jsonl.gz"
) )
@lru_cache() @lru_cache()
def test_cuts(self) -> CutSet: def test_cuts(self) -> List[CutSet]:
return CutSet.from_huggingface_dataset( logging.info("About to get test cuts")
self.dataset["test"], text_key="transcript" return load_manifest_lazy(
self.args.manifest_dir / "mls_english_cuts_test.jsonl.gz"
) )

59
egs/mls_english/ASR/prepare.sh Normal file → Executable file
View File

@ -1,15 +1,12 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Prepare script for MLS English ASR recipe in icefall # Prepare script for MLS English ASR recipe in icefall
# This recipe uses on-the-fly feature extraction, so it skips manifest
# and feature generation steps used in other recipes.
# fix segmentation fault reported in https://github.com/k2-fsa/icefall/issues/674 # fix segmentation fault reported in https://github.com/k2-fsa/icefall/issues/674
export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
set -eou pipefail set -eou pipefail
nj=15
stage=-1 stage=-1
stop_stage=100 stop_stage=100
@ -23,6 +20,9 @@ dl_dir=$PWD/download
# All files generated by this script are saved in "data". # All files generated by this script are saved in "data".
mkdir -p data mkdir -p data
mkdir -p data/audio # Add this line
mkdir -p data/manifests
mkdir -p data/lang
log() { log() {
local fname=${BASH_SOURCE[1]##*/} local fname=${BASH_SOURCE[1]##*/}
@ -41,21 +41,25 @@ if [ $stage -le 0 ] && [ $stop_stage -ge 0 ]; then
fi fi
fi fi
if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then # if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then
log "Stage 1: Prepare MLS English manifests and compute fbank" # log "Stage 1: Prepare MLS English manifest"
# We assume that you have downloaded the MLS English corpus # # We assume that you have downloaded the MLS English corpus
# to $dl_dir/mls_english # # to $dl_dir/mls_english
mkdir -p data/manifests # if [ ! -e data/manifests/.mls_english.done ]; then
if [ ! -e data/mls_english.done ]; then # # lhotse prepare mls_english -j $nj $dl_dir/mls_english data/manifests
# lhotse prepare mls_english -j $nj $dl_dir/mls_english data/manifests # python local/utils/save_audios.py --num-jobs 8 --dataset-dir $dl_dir/mls_english --audio-dir ./data/audio --manifest-dir ./data/manifests
python local/compute_fbank_mls_english.py --manifest-dir data/manifests --audio-dir data/audio --dl-dir $dl_dir/mls_english # touch data/manifests/.mls_english.done
touch data/manifests/.mls_english.done # fi
fi # fi
fi
if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then if [ $stage -le 1 ] && [ $stop_stage -ge 1 ]; then
log "Stage 2: Validate MLS English manifests" log "Stage 1: Compute MLS English fbank"
if [ ! -e data/manifests/.mls_english-validated.done ]; then if [ ! -e data/manifests/.mls_english-validated.done ]; then
python local/compute_fbank_mls_english.py \
--manifest-dir data/manifests \
--audio-dir data/audio \
--dl-dir $dl_dir/mls_english
# --dl-dir /root/datasets/parler-tts--mls_eng
python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_train.jsonl.gz python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_train.jsonl.gz
python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_dev.jsonl.gz python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_dev.jsonl.gz
python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_test.jsonl.gz python local/validate_manifest.py --manifest data/manifests/mls_english_cuts_test.jsonl.gz
@ -63,21 +67,16 @@ if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then
fi fi
fi fi
if [ $stage -le 2 ] && [ $stop_stage -ge 2 ]; then
mkdir -p data/lang log "Stage 2: Prepare transcript for BPE training"
lang_dir=data/lang if [ ! -f data/lang/transcript.txt ]; then
log "Generating transcripts for BPE training"
./local/utils/generate_transcript.py --lang-dir data/lang
fi
fi
if [ $stage -le 3 ] && [ $stop_stage -ge 3 ]; then if [ $stage -le 3 ] && [ $stop_stage -ge 3 ]; then
log "Stage 3: Prepare transcript for BPE training" log "Stage 3: Prepare BPE tokenizer"
if [ ! -f $lang_dir/transcript.txt ]; then
log "Generating transcripts for BPE training"
./local/utils/generate_transcript.py --lang-dir $lang_dir
fi
fi
if [ $stage -le 4 ] && [ $stop_stage -ge 4 ]; then
log "Stage 4: Prepare BPE tokenizer"
for vocab_size in ${vocab_sizes[@]}; do for vocab_size in ${vocab_sizes[@]}; do
log "Training BPE model with vocab_size=${vocab_size}" log "Training BPE model with vocab_size=${vocab_size}"
bpe_dir=data/lang/bpe_${vocab_size} bpe_dir=data/lang/bpe_${vocab_size}
@ -87,7 +86,7 @@ if [ $stage -le 4 ] && [ $stop_stage -ge 4 ]; then
./local/train_bpe_model.py \ ./local/train_bpe_model.py \
--lang-dir $bpe_dir \ --lang-dir $bpe_dir \
--vocab-size $vocab_size \ --vocab-size $vocab_size \
--transcript $lang_dir/transcript.txt --transcript data/lang/transcript.txt
fi fi
done done
fi fi