From 07ed265a84ba1ceede0a9a4bfdc534bfaa1c46e6 Mon Sep 17 00:00:00 2001 From: dohe0342 Date: Fri, 9 Jun 2023 14:17:51 +0900 Subject: [PATCH] from local --- egs/tedlium3/ASR/.prepare.sh.swp | Bin 0 -> 4096 bytes .../.asr_datamodule.py.swp | Bin 0 -> 16384 bytes .../asr_datamodule.py | 335 +-- .../asr_datamodule_libri.py | 559 +++++ .../bias_compare.py | 11 - .../decode_new.py | 834 ------- .../train_uda.py | 1960 ----------------- 7 files changed, 631 insertions(+), 3068 deletions(-) create mode 100644 egs/tedlium3/ASR/.prepare.sh.swp create mode 100644 egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/.asr_datamodule.py.swp create mode 100644 egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule_libri.py delete mode 100644 egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/bias_compare.py delete mode 100755 egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/decode_new.py delete mode 100755 egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/train_uda.py diff --git a/egs/tedlium3/ASR/.prepare.sh.swp b/egs/tedlium3/ASR/.prepare.sh.swp new file mode 100644 index 0000000000000000000000000000000000000000..1407de8a712849666735f78e207f1ae541fa50d2 GIT binary patch literal 4096 zcmYc?2=nw+u+%eP00IF9hW4NJDLfOdv&>n*z>t!kk!oOUVg!=H2V8*aQcE*+eG`+N z^7FF;it_bx@{<#D(Dl^84Ad{rFUl@1NK8)E&rD8DOU%j9Pfah@FG)?w$t=w^)^`jJ z(l01VEl4a%)ho_GH)fPS8UmvsKvoFwG8h{fg0s4^lA?mJP$*fZjA|MUfzc2c4S~@R X7!85Z5Eu=C(GVC7fzc2cnjruH2Cyh{ literal 0 HcmV?d00001 diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/.asr_datamodule.py.swp b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/.asr_datamodule.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..1b005d01f3d4cbdf550b58afe7690d8018c1831c GIT binary patch literal 16384 zcmeHOON<;x8E%5Hi4(^tZ~=iR#)>kV%ws*~5DbGnyKBReec@flI0|%Xx@&sc+uhaf z>fZHO1SBLthyz05aX^tAI1vyC4iE(5078fh5(Fd^@e;X2;&Ff=gFp`7U)??3Gafrx z2RKCC^0%{7@BjNBRrPPXKmWk;DtlXN-oW!3!}!$KKVpCIFTdJ%*QX7`mP0Xr?5!QD zy3w6S9GW))eeYUfXdH<}q`luMF7^oPi7^oPi7^oPi z7^oPi7^oPi7^oPi82BGBz-_}gigEu*7jgXkKfnJ!eb_KQ4}2C_16F{8z>n`RjK_d4 z0Ox@rPy>E@$S@uSR)Gd^FK`d=2H-Z}kGC7f&w;N1PXOnDcK~(ZjllD_8OHB`p8!t+ z-vUO!dw~t02{eFvfy2N7;P(d&<8iL?n_X9oP2yh7a>#GdotH1zw z2si<}3AhD#cAsHf28O@~fCb<#;4p9n36ie?=YfZTC7=cz0RDvJ%$I-*fCJnQ+ymSR zkly}rGf5OydThLdk$7?UXluSoM zwzzGZJaQ6W1aW;l=-l`>nx+;@=DUIGyO#uuhl1f%jwFbg9Iz1m-N0#;#-$!Z;f3AW z*`WxS2zcKUc7r8xEbx|yB^0Y$Nv!7BuIqWMFPJ3*3-62sfX!+K4Uj*%$fvlcM6q_N z)hz`bCaWNKY0M_gR$Aa51h0=|0@0me%yu~>1`%#xMS?5A+?Wky#5}MkB1SK` zC;_QvQ?Uisg2eBmC0Oex);8Me{>10fDB!+pE$Cv?u+Y5JgIz=vd?2{i0)eLmh=S%kp7qq@G@KAtti=( z>Qk9PV^d{37S^!`-7AdXWvgznb1(#p2P_iwmJ=!89nepUCkiqxo>}U#_4U=++F#`t zT|eDN5Ya3xs57Qi^9N$SGWP=*I}YFKR?s;XKCN(7st+jQ|eEBc!S47GJD&_ zsu4g&P#={<+zr}0+;eTb52nBad~90y+<9q?dW}4!J?QgbyEa!SWbDb*XtEey?DN1K zVE87czbPSVJaH(r=@_UQOvKjoDr;gefN1Vcrfa4U=Fn6d50u0ZV-T4fmkN0pdM;E* z!b~ZtwlUh0ku_vXs9TZT77;w0EPQ1sV+4%FB;FM9-HJ7v6h%iA_u@!;I;2jUzOo`W zq`=*+A$Fj*)ZY~2kzg=6dmKE%NQ68M@)TPp2a4U;6Wto#)JvBf2~S7aY_w@kSr^V7 zDG^QV3#Rs(ba#MGS$WS;b;|=UX|Lm0ouhwk^|C!>Lk~B1+#p4tusp7mFXExJvA<`J z65gPNjtaBWhpwP`OcR?J4gci>u3KorV|r+mWih$e*bZPiLgY%*+|P*ro_jo3i&I05+q_~NoKpLOjpQ|Ux(TFVQX64fBQAzmI~g(AZ_8<=TrTSgVz?n} z0{bXMSOisH2C1e)Hx!;52$sHfsmem8((%A%_I5|>qa=R1PEvEK-7R6$0(vScy22I=?R422tF0tHCwjRZn@NQ>mpWBRxiK!rXdk7)k}* z)H@4>gLHpNOYiTXp|FOhu@N}1dl(I^0~2QS%swQfl^AT*y*U1Tu{cpRW{HrmxL&Tz z3)48&V5cZnXCD}F;5rRP7GUO4dFrd_@k@BOSF?}98HLV_&t5QUuqD@u8*E*NF}!rF zn}rDap4-pc(H@Xi43Yr5jVv_hMTu!~oF9yAOiwxvrpUh{IOxCw^&ep#4lY^9XGs8) z)!wlJlL(Y(p~0p|53*U&IF^nhgU16W;f`RA+!0YgvcL;{S1G6kRvp+FLyq?Hs#*d2I=tgeaU33Rw|94UTO2WVC|WXd+Fr__ zYOgFWo?P2J+3etjbm~H{CF(`NfhUz75K}GajR*K2W1&AY+P1XaL~`E*2;* zS|Y@;Dxfe1V(6-n9X1f!O3KTAEfdw#Ys?!p*aL{GP!z-ZSe+tcE$dX9d%IpR89=Mm z9{&$nKGWI@~OW$RS>GndN z%<4Ve={Q>j;cA$`zSv44=RnEKAvRnYN9;%6#eS`}r9t<9y3hU{cU!vuFW`foU*fL+ zMc^~Qqd*^c5Ab^6D(?Hw0e=I&2T(rXk z_&)Cbp9CHOBya*a1W-QUD)3X_+rYd%Z z$a<~UmvjY`%n|s=lR8;*x{p(t9#GFa$C~fVavIs~etbr+&lR`(m%Mn~F88>0^w9aJ z($Z11=am;cks`)fiWi`oF1f{e4eCnI^=UA4J$~Fs8~W-@x-7}0@6&shDqD!wrR|L; zpA;5%9fcbSat{(g~DX^Vd1lq{Q4_wh9e4RCrL9&k_}43I#ZAw2S{W}X=D7SflC zi`5bZV^1uK%S@0Yvv2tIVs$jmq^77ORjIaw(n?CD#f;RPX2opR&o>l23jEb2b#CfN z&r6=22AN2pjJnf#le*44q(9TdL!FGu7oOQix~qHN{M_7h3i-v$guH#BE*Y4(9-7GS znUvpCg|IKygpi-f?PnY#DVLPx%Cg+$Soz1Zkg20*VY*oglgP;&ROiS{c&I^z^h{E} zcNgtUn!yd7E%()Q6*HYxSHYW#QS&9gfKyKGHknUS&@On(DZN(i|GG=+RZk0%N&NqF ZN)i-ClKw|}{9VykUBvf5>0cYhe*oe2VQ2sV literal 0 HcmV?d00001 diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule.py b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule.py index 1ecda2668..c647392f0 100644 --- a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule.py +++ b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule.py @@ -1,5 +1,5 @@ # Copyright 2021 Piotr Żelasko -# Copyright 2022 Xiaomi Corporation (Author: Mingshuang Luo) +# Copyright 2021 Xiaomi Corporation (Author: Mingshuang Luo) # # See ../../../../LICENSE for clarification regarding multiple authors # @@ -17,48 +17,32 @@ import argparse -import inspect import logging -from glob import glob from functools import lru_cache from pathlib import Path from typing import Any, Dict, Optional -import torch from lhotse import CutSet, Fbank, FbankConfig, load_manifest, load_manifest_lazy -from lhotse.dataset import ( # noqa F401 for PrecomputedFeatures +from lhotse.dataset import ( CutConcatenate, CutMix, DynamicBucketingSampler, K2SpeechRecognitionDataset, - PrecomputedFeatures, SingleCutSampler, SpecAugment, ) -from lhotse.dataset.input_strategies import ( # noqa F401 For AudioSamples - AudioSamples, - OnTheFlyFeatures, -) -from lhotse.utils import fix_random_seed +from lhotse.dataset.input_strategies import OnTheFlyFeatures from torch.utils.data import DataLoader from icefall.utils import str2bool -class _SeedWorkers: - def __init__(self, seed: int): - self.seed = seed - - def __call__(self, worker_id: int): - fix_random_seed(self.seed + worker_id) - - -class LibriSpeechAsrDataModule: +class TedLiumAsrDataModule: """ DataModule for k2 ASR experiments. It assumes there is always one train and valid dataloader, - but there can be multiple test dataloaders (e.g. LibriSpeech test-clean - and test-other). + but there can be multiple test dataloaders (e.g. TEDLium3 dev + and test). It contains all the common data pipeline modules used in ASR experiments, e.g.: @@ -83,12 +67,6 @@ class LibriSpeechAsrDataModule: "effective batch sizes, sampling strategies, applied data " "augmentations, etc.", ) - group.add_argument( - "--full-libri", - type=str2bool, - default=False, - help="When enabled, use 960h LibriSpeech. Otherwise, use 100h subset.", - ) group.add_argument( "--manifest-dir", type=Path, @@ -98,7 +76,7 @@ class LibriSpeechAsrDataModule: group.add_argument( "--max-duration", type=int, - default=250.0, + default=200.0, help="Maximum pooled recordings duration (seconds) in a " "single batch. You can reduce it if it causes CUDA OOM.", ) @@ -153,12 +131,6 @@ class LibriSpeechAsrDataModule: help="When enabled (=default), the examples will be " "shuffled for each epoch.", ) - group.add_argument( - "--drop-last", - type=str2bool, - default=True, - help="Whether to drop last batch. Used by sampler.", - ) group.add_argument( "--return-cuts", type=str2bool, @@ -167,7 +139,6 @@ class LibriSpeechAsrDataModule: "field: batch['supervisions']['cut'] with the cuts that " "were used to construct it.", ) - group.add_argument( "--num-workers", type=int, @@ -175,14 +146,12 @@ class LibriSpeechAsrDataModule: help="The number of training dataloader workers that " "collect the batches.", ) - group.add_argument( "--enable-spec-aug", type=str2bool, - default=False, + default=True, help="When enabled, use SpecAugment for training dataset.", ) - group.add_argument( "--spec-aug-time-warp-factor", type=int, @@ -192,38 +161,16 @@ class LibriSpeechAsrDataModule: "Larger values mean more warping. " "A value less than 1 means to disable time warp.", ) - group.add_argument( "--enable-musan", type=str2bool, default=True, help="When enabled, select noise from MUSAN and mix it" - "with training dataset. ", - ) - - group.add_argument( - "--input-strategy", - type=str, - default="AudioSamples", - help="AudioSamples or PrecomputedFeatures", - ) - - group.add_argument( - "--spk-id", - type=int, - default=0, - ) - - group.add_argument( - "--prefix", - type=str, - default='vox', + "with training dataset.", ) def train_dataloaders( - self, - cuts_train: CutSet, - sampler_state_dict: Optional[Dict[str, Any]] = None, + self, cuts_train: CutSet, sampler_state_dict: Optional[Dict[str, Any]] = None ) -> DataLoader: """ Args: @@ -232,10 +179,30 @@ class LibriSpeechAsrDataModule: sampler_state_dict: The state dict for the training sampler. """ + + input_transforms = [] + if self.args.enable_spec_aug: + logging.info("Enable SpecAugment") + logging.info(f"Time warp factor: {self.args.spec_aug_time_warp_factor}") + + input_transforms.append( + SpecAugment( + time_warp_factor=self.args.spec_aug_time_warp_factor, + num_frame_masks=10, + features_mask_size=27, + num_feature_masks=2, + frames_mask_size=100, + max_frames_mask_fraction=0.15, + p=0.9, + ) + ) + else: + logging.info("Disable SpecAugment") + + logging.info("About to get Musan cuts") transforms = [] if self.args.enable_musan: logging.info("Enable MUSAN") - logging.info("About to get Musan cuts") cuts_musan = load_manifest(self.args.manifest_dir / "musan_cuts.jsonl.gz") transforms.append( CutMix(cuts=cuts_musan, prob=0.5, snr=(10, 20), preserve_id=True) @@ -257,40 +224,7 @@ class LibriSpeechAsrDataModule: ) ] + transforms - input_transforms = [] - if self.args.enable_spec_aug: - logging.info("Enable SpecAugment") - logging.info(f"Time warp factor: {self.args.spec_aug_time_warp_factor}") - # Set the value of num_frame_masks according to Lhotse's version. - # In different Lhotse's versions, the default of num_frame_masks is - # different. - num_frame_masks = 10 - num_frame_masks_parameter = inspect.signature( - SpecAugment.__init__ - ).parameters["num_frame_masks"] - if num_frame_masks_parameter.default == 1: - num_frame_masks = 2 - logging.info(f"Num frame mask: {num_frame_masks}") - input_transforms.append( - SpecAugment( - time_warp_factor=self.args.spec_aug_time_warp_factor, - num_frame_masks=num_frame_masks, - features_mask_size=27, - num_feature_masks=2, - frames_mask_size=100, - ) - ) - else: - logging.info("Disable SpecAugment") - logging.info("About to create train dataset") - train = K2SpeechRecognitionDataset( - input_strategy=eval(self.args.input_strategy)(), - cut_transforms=transforms, - input_transforms=input_transforms, - return_cuts=self.args.return_cuts, - ) - if self.args.on_the_fly_feats: # NOTE: the PerturbSpeed transform should be added only if we # remove it from data prep stage. @@ -308,6 +242,12 @@ class LibriSpeechAsrDataModule: input_transforms=input_transforms, return_cuts=self.args.return_cuts, ) + else: + train = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_transforms=input_transforms, + return_cuts=self.args.return_cuts, + ) if self.args.bucketing_sampler: logging.info("Using DynamicBucketingSampler.") @@ -316,7 +256,7 @@ class LibriSpeechAsrDataModule: max_duration=self.args.max_duration, shuffle=self.args.shuffle, num_buckets=self.args.num_buckets, - drop_last=self.args.drop_last, + drop_last=True, ) else: logging.info("Using SingleCutSampler.") @@ -325,29 +265,24 @@ class LibriSpeechAsrDataModule: max_duration=self.args.max_duration, shuffle=self.args.shuffle, ) - logging.info("About to create train dataloader") if sampler_state_dict is not None: logging.info("Loading sampler state dict") train_sampler.load_state_dict(sampler_state_dict) - # 'seed' is derived from the current random state, which will have - # previously been set in the main process. - seed = torch.randint(0, 100000, ()).item() - worker_init_fn = _SeedWorkers(seed) - + logging.info("About to create train dataloader") train_dl = DataLoader( train, sampler=train_sampler, batch_size=None, num_workers=self.args.num_workers, persistent_workers=False, - worker_init_fn=worker_init_fn, ) return train_dl def valid_dataloaders(self, cuts_valid: CutSet) -> DataLoader: + transforms = [] if self.args.concatenate_cuts: transforms = [ @@ -360,21 +295,21 @@ class LibriSpeechAsrDataModule: if self.args.on_the_fly_feats: validate = K2SpeechRecognitionDataset( cut_transforms=transforms, - input_strategy=eval(self.args.input_strategy)(), - #input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), return_cuts=self.args.return_cuts, ) else: validate = K2SpeechRecognitionDataset( cut_transforms=transforms, - input_strategy=eval(self.args.input_strategy)(), return_cuts=self.args.return_cuts, ) + valid_sampler = DynamicBucketingSampler( cuts_valid, max_duration=self.args.max_duration, shuffle=False, ) + logging.info("About to create dev dataloader") valid_dl = DataLoader( validate, @@ -386,174 +321,48 @@ class LibriSpeechAsrDataModule: return valid_dl - def test_dataloaders(self, cuts: CutSet) -> DataLoader: + def test_dataloaders(self, cuts_test: CutSet) -> DataLoader: + logging.debug("About to create test dataset") - test = K2SpeechRecognitionDataset( - input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))) - if self.args.on_the_fly_feats - else eval(self.args.input_strategy)(), - return_cuts=self.args.return_cuts, - ) - sampler = DynamicBucketingSampler( - cuts, + if self.args.on_the_fly_feats: + test = K2SpeechRecognitionDataset( + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + return_cuts=self.args.return_cuts, + ) + else: + test = K2SpeechRecognitionDataset( + return_cuts=self.args.return_cuts, + ) + + test_sampler = DynamicBucketingSampler( + cuts_test, max_duration=self.args.max_duration, shuffle=False, ) + logging.debug("About to create test dataloader") test_dl = DataLoader( test, batch_size=None, - sampler=sampler, + sampler=test_sampler, num_workers=self.args.num_workers, + persistent_workers=False, ) return test_dl - - @lru_cache() - def train_clean_10_cuts(self, option=None) -> CutSet: - logging.info("About to get train-clean-10 cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-100.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-10_{option}.jsonl" - ) @lru_cache() - def train_clean_100_cuts(self, option=None) -> CutSet: - logging.info("About to get train-clean-100 cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-100.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-100_{option}.jsonl" - ) - - @lru_cache() - def train_clean_360_cuts(self, option=None) -> CutSet: - logging.info("About to get train-clean-360 cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-360.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-clean-360_{option}.jsonl" - ) - - @lru_cache() - def train_other_500_cuts(self, option=None) -> CutSet: - logging.info("About to get train-other-500 cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-other-500.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-other-500_{option}.jsonl" - ) - - @lru_cache() - def train_all_shuf_cuts(self, option=None) -> CutSet: - logging.info( - "About to get the shuffled train-clean-100, \ - train-clean-360 and train-other-500 cuts" - ) - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-all-shuf.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_train-all-shuf_{option}.jsonl" - ) - - @lru_cache() - def dev_clean_cuts(self, option=None) -> CutSet: - logging.info("About to get dev-clean cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_dev-clean.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_dev-clean_{option}.jsonl" - ) - - @lru_cache() - def dev_other_cuts(self, option=None) -> CutSet: - logging.info("About to get dev-other cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_dev-other.jsonl" - ) - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_dev-other_{option}.jsonl" - ) - - @lru_cache() - def test_clean_cuts(self, option=None) -> CutSet: - logging.info("About to get test-clean cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_test-clean.jsonl" - ) - elif option == 'user': - json_list = sorted(glob(str(self.args.manifest_dir) + "/userlibri/test-clean/*")) - spk_list = [json.split('/')[-1][:-6] for json in json_list] - - return [load_manifest_lazy(json) for json in json_list], spk_list - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_test-clean_{option}.jsonl" - ) - - @lru_cache() - def test_other_cuts(self, option=None) -> CutSet: - logging.info("About to get test-other cuts") - if option is None: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_test-other_{option}.jsonl" - ) - elif option == 'user': - json_list = sorted(glob(str(self.args.manifest_dir) + "/userlibri/test-other/*")) - spk_list = [json.split('/')[-1][:-6] for json in json_list] - - return [load_manifest_lazy(json) for json in json_list], spk_list - else: - return load_manifest_lazy( - self.args.manifest_dir / f"librispeech_cuts_test-other_{option}.jsonl" - ) - - @lru_cache() - def test_clean_user(self, option=None) -> CutSet: - logging.info("About to get test-clean user cuts") + def train_cuts(self) -> CutSet: + logging.info("About to get train cuts") return load_manifest_lazy( - self.args.manifest_dir / f"userlibri/test-clean_sampling/{option}.jsonl" - ) - - @lru_cache() - def test_other_user(self, option=None) -> CutSet: - logging.info("About to get test-other user cuts") - return load_manifest_lazy( - self.args.manifest_dir / f"userlibri/test-other_sampling/{option}.jsonl" - ) - - @lru_cache() - def vox_cuts(self, option=None) -> CutSet: - logging.info("About to get test-other user cuts") - return load_manifest_lazy( - self.args.manifest_dir / f"{self.args.prefix}_cuts_{option}.jsonl.gz" - ) - - @lru_cache() - def userlibri_cuts(self, option=None) -> CutSet: - logging.info("About to get userlibri cuts") - return load_manifest_lazy( - self.args.manifest_dir / f"{option}.jsonl" + self.args.manifest_dir / "tedlium_cuts_train.jsonl.gz" ) + @lru_cache() + def dev_cuts(self) -> CutSet: + logging.info("About to get dev cuts") + return load_manifest_lazy(self.args.manifest_dir / "tedlium_cuts_dev.jsonl.gz") + + @lru_cache() + def test_cuts(self) -> CutSet: + logging.info("About to get test cuts") + return load_manifest_lazy(self.args.manifest_dir / "tedlium_cuts_test.jsonl.gz") diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule_libri.py b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule_libri.py new file mode 100644 index 000000000..1ecda2668 --- /dev/null +++ b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/asr_datamodule_libri.py @@ -0,0 +1,559 @@ +# Copyright 2021 Piotr Żelasko +# Copyright 2022 Xiaomi Corporation (Author: Mingshuang Luo) +# +# See ../../../../LICENSE for clarification regarding multiple authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import argparse +import inspect +import logging +from glob import glob +from functools import lru_cache +from pathlib import Path +from typing import Any, Dict, Optional + +import torch +from lhotse import CutSet, Fbank, FbankConfig, load_manifest, load_manifest_lazy +from lhotse.dataset import ( # noqa F401 for PrecomputedFeatures + CutConcatenate, + CutMix, + DynamicBucketingSampler, + K2SpeechRecognitionDataset, + PrecomputedFeatures, + SingleCutSampler, + SpecAugment, +) +from lhotse.dataset.input_strategies import ( # noqa F401 For AudioSamples + AudioSamples, + OnTheFlyFeatures, +) +from lhotse.utils import fix_random_seed +from torch.utils.data import DataLoader + +from icefall.utils import str2bool + + +class _SeedWorkers: + def __init__(self, seed: int): + self.seed = seed + + def __call__(self, worker_id: int): + fix_random_seed(self.seed + worker_id) + + +class LibriSpeechAsrDataModule: + """ + DataModule for k2 ASR experiments. + It assumes there is always one train and valid dataloader, + but there can be multiple test dataloaders (e.g. LibriSpeech test-clean + and test-other). + + It contains all the common data pipeline modules used in ASR + experiments, e.g.: + - dynamic batch size, + - bucketing samplers, + - cut concatenation, + - augmentation, + - on-the-fly feature extraction + + This class should be derived for specific corpora used in ASR tasks. + """ + + def __init__(self, args: argparse.Namespace): + self.args = args + + @classmethod + def add_arguments(cls, parser: argparse.ArgumentParser): + group = parser.add_argument_group( + title="ASR data related options", + description="These options are used for the preparation of " + "PyTorch DataLoaders from Lhotse CutSet's -- they control the " + "effective batch sizes, sampling strategies, applied data " + "augmentations, etc.", + ) + group.add_argument( + "--full-libri", + type=str2bool, + default=False, + help="When enabled, use 960h LibriSpeech. Otherwise, use 100h subset.", + ) + group.add_argument( + "--manifest-dir", + type=Path, + default=Path("data/fbank"), + help="Path to directory with train/valid/test cuts.", + ) + group.add_argument( + "--max-duration", + type=int, + default=250.0, + help="Maximum pooled recordings duration (seconds) in a " + "single batch. You can reduce it if it causes CUDA OOM.", + ) + group.add_argument( + "--bucketing-sampler", + type=str2bool, + default=True, + help="When enabled, the batches will come from buckets of " + "similar duration (saves padding frames).", + ) + group.add_argument( + "--num-buckets", + type=int, + default=30, + help="The number of buckets for the DynamicBucketingSampler" + "(you might want to increase it for larger datasets).", + ) + group.add_argument( + "--concatenate-cuts", + type=str2bool, + default=False, + help="When enabled, utterances (cuts) will be concatenated " + "to minimize the amount of padding.", + ) + group.add_argument( + "--duration-factor", + type=float, + default=1.0, + help="Determines the maximum duration of a concatenated cut " + "relative to the duration of the longest cut in a batch.", + ) + group.add_argument( + "--gap", + type=float, + default=1.0, + help="The amount of padding (in seconds) inserted between " + "concatenated cuts. This padding is filled with noise when " + "noise augmentation is used.", + ) + group.add_argument( + "--on-the-fly-feats", + type=str2bool, + default=False, + help="When enabled, use on-the-fly cut mixing and feature " + "extraction. Will drop existing precomputed feature manifests " + "if available.", + ) + group.add_argument( + "--shuffle", + type=str2bool, + default=True, + help="When enabled (=default), the examples will be " + "shuffled for each epoch.", + ) + group.add_argument( + "--drop-last", + type=str2bool, + default=True, + help="Whether to drop last batch. Used by sampler.", + ) + group.add_argument( + "--return-cuts", + type=str2bool, + default=True, + help="When enabled, each batch will have the " + "field: batch['supervisions']['cut'] with the cuts that " + "were used to construct it.", + ) + + group.add_argument( + "--num-workers", + type=int, + default=2, + help="The number of training dataloader workers that " + "collect the batches.", + ) + + group.add_argument( + "--enable-spec-aug", + type=str2bool, + default=False, + help="When enabled, use SpecAugment for training dataset.", + ) + + group.add_argument( + "--spec-aug-time-warp-factor", + type=int, + default=80, + help="Used only when --enable-spec-aug is True. " + "It specifies the factor for time warping in SpecAugment. " + "Larger values mean more warping. " + "A value less than 1 means to disable time warp.", + ) + + group.add_argument( + "--enable-musan", + type=str2bool, + default=True, + help="When enabled, select noise from MUSAN and mix it" + "with training dataset. ", + ) + + group.add_argument( + "--input-strategy", + type=str, + default="AudioSamples", + help="AudioSamples or PrecomputedFeatures", + ) + + group.add_argument( + "--spk-id", + type=int, + default=0, + ) + + group.add_argument( + "--prefix", + type=str, + default='vox', + ) + + def train_dataloaders( + self, + cuts_train: CutSet, + sampler_state_dict: Optional[Dict[str, Any]] = None, + ) -> DataLoader: + """ + Args: + cuts_train: + CutSet for training. + sampler_state_dict: + The state dict for the training sampler. + """ + transforms = [] + if self.args.enable_musan: + logging.info("Enable MUSAN") + logging.info("About to get Musan cuts") + cuts_musan = load_manifest(self.args.manifest_dir / "musan_cuts.jsonl.gz") + transforms.append( + CutMix(cuts=cuts_musan, prob=0.5, snr=(10, 20), preserve_id=True) + ) + else: + logging.info("Disable MUSAN") + + if self.args.concatenate_cuts: + logging.info( + f"Using cut concatenation with duration factor " + f"{self.args.duration_factor} and gap {self.args.gap}." + ) + # Cut concatenation should be the first transform in the list, + # so that if we e.g. mix noise in, it will fill the gaps between + # different utterances. + transforms = [ + CutConcatenate( + duration_factor=self.args.duration_factor, gap=self.args.gap + ) + ] + transforms + + input_transforms = [] + if self.args.enable_spec_aug: + logging.info("Enable SpecAugment") + logging.info(f"Time warp factor: {self.args.spec_aug_time_warp_factor}") + # Set the value of num_frame_masks according to Lhotse's version. + # In different Lhotse's versions, the default of num_frame_masks is + # different. + num_frame_masks = 10 + num_frame_masks_parameter = inspect.signature( + SpecAugment.__init__ + ).parameters["num_frame_masks"] + if num_frame_masks_parameter.default == 1: + num_frame_masks = 2 + logging.info(f"Num frame mask: {num_frame_masks}") + input_transforms.append( + SpecAugment( + time_warp_factor=self.args.spec_aug_time_warp_factor, + num_frame_masks=num_frame_masks, + features_mask_size=27, + num_feature_masks=2, + frames_mask_size=100, + ) + ) + else: + logging.info("Disable SpecAugment") + + logging.info("About to create train dataset") + train = K2SpeechRecognitionDataset( + input_strategy=eval(self.args.input_strategy)(), + cut_transforms=transforms, + input_transforms=input_transforms, + return_cuts=self.args.return_cuts, + ) + + if self.args.on_the_fly_feats: + # NOTE: the PerturbSpeed transform should be added only if we + # remove it from data prep stage. + # Add on-the-fly speed perturbation; since originally it would + # have increased epoch size by 3, we will apply prob 2/3 and use + # 3x more epochs. + # Speed perturbation probably should come first before + # concatenation, but in principle the transforms order doesn't have + # to be strict (e.g. could be randomized) + # transforms = [PerturbSpeed(factors=[0.9, 1.1], p=2/3)] + transforms # noqa + # Drop feats to be on the safe side. + train = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + input_transforms=input_transforms, + return_cuts=self.args.return_cuts, + ) + + if self.args.bucketing_sampler: + logging.info("Using DynamicBucketingSampler.") + train_sampler = DynamicBucketingSampler( + cuts_train, + max_duration=self.args.max_duration, + shuffle=self.args.shuffle, + num_buckets=self.args.num_buckets, + drop_last=self.args.drop_last, + ) + else: + logging.info("Using SingleCutSampler.") + train_sampler = SingleCutSampler( + cuts_train, + max_duration=self.args.max_duration, + shuffle=self.args.shuffle, + ) + logging.info("About to create train dataloader") + + if sampler_state_dict is not None: + logging.info("Loading sampler state dict") + train_sampler.load_state_dict(sampler_state_dict) + + # 'seed' is derived from the current random state, which will have + # previously been set in the main process. + seed = torch.randint(0, 100000, ()).item() + worker_init_fn = _SeedWorkers(seed) + + train_dl = DataLoader( + train, + sampler=train_sampler, + batch_size=None, + num_workers=self.args.num_workers, + persistent_workers=False, + worker_init_fn=worker_init_fn, + ) + + return train_dl + + def valid_dataloaders(self, cuts_valid: CutSet) -> DataLoader: + transforms = [] + if self.args.concatenate_cuts: + transforms = [ + CutConcatenate( + duration_factor=self.args.duration_factor, gap=self.args.gap + ) + ] + transforms + + logging.info("About to create dev dataset") + if self.args.on_the_fly_feats: + validate = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_strategy=eval(self.args.input_strategy)(), + #input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))), + return_cuts=self.args.return_cuts, + ) + else: + validate = K2SpeechRecognitionDataset( + cut_transforms=transforms, + input_strategy=eval(self.args.input_strategy)(), + return_cuts=self.args.return_cuts, + ) + valid_sampler = DynamicBucketingSampler( + cuts_valid, + max_duration=self.args.max_duration, + shuffle=False, + ) + logging.info("About to create dev dataloader") + valid_dl = DataLoader( + validate, + sampler=valid_sampler, + batch_size=None, + num_workers=2, + persistent_workers=False, + ) + + return valid_dl + + def test_dataloaders(self, cuts: CutSet) -> DataLoader: + logging.debug("About to create test dataset") + test = K2SpeechRecognitionDataset( + input_strategy=OnTheFlyFeatures(Fbank(FbankConfig(num_mel_bins=80))) + if self.args.on_the_fly_feats + else eval(self.args.input_strategy)(), + return_cuts=self.args.return_cuts, + ) + sampler = DynamicBucketingSampler( + cuts, + max_duration=self.args.max_duration, + shuffle=False, + ) + logging.debug("About to create test dataloader") + test_dl = DataLoader( + test, + batch_size=None, + sampler=sampler, + num_workers=self.args.num_workers, + ) + return test_dl + + @lru_cache() + def train_clean_10_cuts(self, option=None) -> CutSet: + logging.info("About to get train-clean-10 cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-100.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-10_{option}.jsonl" + ) + + @lru_cache() + def train_clean_100_cuts(self, option=None) -> CutSet: + logging.info("About to get train-clean-100 cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-100.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-100_{option}.jsonl" + ) + + @lru_cache() + def train_clean_360_cuts(self, option=None) -> CutSet: + logging.info("About to get train-clean-360 cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-360.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-clean-360_{option}.jsonl" + ) + + @lru_cache() + def train_other_500_cuts(self, option=None) -> CutSet: + logging.info("About to get train-other-500 cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-other-500.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-other-500_{option}.jsonl" + ) + + @lru_cache() + def train_all_shuf_cuts(self, option=None) -> CutSet: + logging.info( + "About to get the shuffled train-clean-100, \ + train-clean-360 and train-other-500 cuts" + ) + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-all-shuf.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_train-all-shuf_{option}.jsonl" + ) + + @lru_cache() + def dev_clean_cuts(self, option=None) -> CutSet: + logging.info("About to get dev-clean cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_dev-clean.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_dev-clean_{option}.jsonl" + ) + + @lru_cache() + def dev_other_cuts(self, option=None) -> CutSet: + logging.info("About to get dev-other cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_dev-other.jsonl" + ) + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_dev-other_{option}.jsonl" + ) + + @lru_cache() + def test_clean_cuts(self, option=None) -> CutSet: + logging.info("About to get test-clean cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_test-clean.jsonl" + ) + elif option == 'user': + json_list = sorted(glob(str(self.args.manifest_dir) + "/userlibri/test-clean/*")) + spk_list = [json.split('/')[-1][:-6] for json in json_list] + + return [load_manifest_lazy(json) for json in json_list], spk_list + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_test-clean_{option}.jsonl" + ) + + @lru_cache() + def test_other_cuts(self, option=None) -> CutSet: + logging.info("About to get test-other cuts") + if option is None: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_test-other_{option}.jsonl" + ) + elif option == 'user': + json_list = sorted(glob(str(self.args.manifest_dir) + "/userlibri/test-other/*")) + spk_list = [json.split('/')[-1][:-6] for json in json_list] + + return [load_manifest_lazy(json) for json in json_list], spk_list + else: + return load_manifest_lazy( + self.args.manifest_dir / f"librispeech_cuts_test-other_{option}.jsonl" + ) + + @lru_cache() + def test_clean_user(self, option=None) -> CutSet: + logging.info("About to get test-clean user cuts") + return load_manifest_lazy( + self.args.manifest_dir / f"userlibri/test-clean_sampling/{option}.jsonl" + ) + + @lru_cache() + def test_other_user(self, option=None) -> CutSet: + logging.info("About to get test-other user cuts") + return load_manifest_lazy( + self.args.manifest_dir / f"userlibri/test-other_sampling/{option}.jsonl" + ) + + @lru_cache() + def vox_cuts(self, option=None) -> CutSet: + logging.info("About to get test-other user cuts") + return load_manifest_lazy( + self.args.manifest_dir / f"{self.args.prefix}_cuts_{option}.jsonl.gz" + ) + + @lru_cache() + def userlibri_cuts(self, option=None) -> CutSet: + logging.info("About to get userlibri cuts") + return load_manifest_lazy( + self.args.manifest_dir / f"{option}.jsonl" + ) + diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/bias_compare.py b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/bias_compare.py deleted file mode 100644 index 1c18fec88..000000000 --- a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/bias_compare.py +++ /dev/null @@ -1,11 +0,0 @@ -import torch - -base_model = torch.load('./d2v-base-T.pt') -bias_model = torch.load('./bitfit_533_v2/checkpoint-100.pt') - -base_model, bias_model = base_model['model'], bias_model['model'] - -for key in base_model.keys(): - if 'bias' in key: - l1_diff = torch.abs(base_model[key]-bias_model[key]).sum() / base_model[key].size(0) - print(key, l1_diff.item()) diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/decode_new.py b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/decode_new.py deleted file mode 100755 index d245eabf5..000000000 --- a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/decode_new.py +++ /dev/null @@ -1,834 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2021-2022 Xiaomi Corporation (Author: Fangjun Kuang, -# Zengwei Yao) -# -# See ../../../../LICENSE for clarification regarding multiple authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Usage: -(0) for d2v-T decoding -for method in greedy_search modified_beam_search fast_beam_search; do - ./pruned_transducer_stateless_d2v_v2/decode.py \ - --input-strategy AudioSamples \ - --enable-spec-aug False \ - --additional-block True \ - --model-name epoc.pt \ - --exp-dir ./pruned_transducer_stateless_d2v_v2/960h_sweep_v3_388 \ - --max-duration 400 \ - --decoding-method $method \ - --max-sym-per-frame 1 \ - --encoder-type d2v \ - --encoder-dim 768 \ - --decoder-dim 768 \ - --joiner-dim 768 -done -""" - - -import argparse -import logging -import math -from collections import defaultdict -from pathlib import Path -from typing import Dict, List, Optional, Tuple - -import k2 -import sentencepiece as spm -import torch -import torch.nn as nn -from asr_datamodule import LibriSpeechAsrDataModule -from beam_search import ( - beam_search, - fast_beam_search_nbest, - fast_beam_search_nbest_LG, - fast_beam_search_nbest_oracle, - fast_beam_search_one_best, - greedy_search, - greedy_search_batch, - modified_beam_search, -) -#from train import add_model_arguments, add_rep_arguments, get_params, get_transducer_model -from prompt_tuning import add_model_arguments, add_rep_arguments, get_params, get_transducer_model - -from icefall.checkpoint import ( - average_checkpoints, - average_checkpoints_with_averaged_model, - find_checkpoints, - load_checkpoint, -) -from icefall.lexicon import Lexicon -from icefall.utils import ( - AttributeDict, - setup_logger, - store_transcripts, - str2bool, - write_error_stats, -) - -from train_lora import LoRAHook - -LOG_EPS = math.log(1e-10) - - -def get_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument( - "--model-name", - type=str, - default="", - help="""It specifies the model file name to use for decoding.""", - ) - - parser.add_argument( - "--epoch", - type=int, - default=30, - help="""It specifies the checkpoint to use for decoding. - Note: Epoch counts from 1. - You can specify --avg to use more checkpoints for model averaging.""", - ) - - parser.add_argument( - "--iter", - type=int, - default=0, - help="""If positive, --epoch is ignored and it - will use the checkpoint exp_dir/checkpoint-iter.pt. - You can specify --avg to use more checkpoints for model averaging. - """, - ) - - parser.add_argument( - "--avg", - type=int, - default=9, - help="Number of checkpoints to average. Automatically select " - "consecutive checkpoints before the checkpoint specified by " - "'--epoch' and '--iter'", - ) - - parser.add_argument( - "--use-averaged-model", - type=str2bool, - default=True, - help="Whether to load averaged model. Currently it only supports " - "using --epoch. If True, it would decode with the averaged model " - "over the epoch range from `epoch-avg` (excluded) to `epoch`." - "Actually only the models with epoch number of `epoch-avg` and " - "`epoch` are loaded for averaging. ", - ) - - parser.add_argument( - "--exp-dir", - type=str, - default="pruned_transducer_stateless7_ctc/exp", - help="The experiment dir", - ) - - parser.add_argument( - "--bpe-model", - type=str, - default="data/lang_bpe_500/bpe.model", - help="Path to the BPE model", - ) - - parser.add_argument( - "--lang-dir", - type=Path, - default="data/lang_bpe_500", - help="The lang dir containing word table and LG graph", - ) - - parser.add_argument( - "--decoding-method", - type=str, - default="greedy_search", - help="""Possible values are: - - greedy_search - - beam_search - - modified_beam_search - - fast_beam_search - - fast_beam_search_nbest - - fast_beam_search_nbest_oracle - - fast_beam_search_nbest_LG - If you use fast_beam_search_nbest_LG, you have to specify - `--lang-dir`, which should contain `LG.pt`. - """, - ) - - parser.add_argument( - "--beam-size", - type=int, - default=4, - help="""An integer indicating how many candidates we will keep for each - frame. Used only when --decoding-method is beam_search or - modified_beam_search.""", - ) - - parser.add_argument( - "--beam", - type=float, - default=20.0, - help="""A floating point value to calculate the cutoff score during beam - search (i.e., `cutoff = max-score - beam`), which is the same as the - `beam` in Kaldi. - Used only when --decoding-method is fast_beam_search, - fast_beam_search_nbest, fast_beam_search_nbest_LG, - and fast_beam_search_nbest_oracle - """, - ) - - parser.add_argument( - "--ngram-lm-scale", - type=float, - default=0.01, - help=""" - Used only when --decoding_method is fast_beam_search_nbest_LG. - It specifies the scale for n-gram LM scores. - """, - ) - - parser.add_argument( - "--max-contexts", - type=int, - default=8, - help="""Used only when --decoding-method is - fast_beam_search, fast_beam_search_nbest, fast_beam_search_nbest_LG, - and fast_beam_search_nbest_oracle""", - ) - - parser.add_argument( - "--max-states", - type=int, - default=64, - help="""Used only when --decoding-method is - fast_beam_search, fast_beam_search_nbest, fast_beam_search_nbest_LG, - and fast_beam_search_nbest_oracle""", - ) - - parser.add_argument( - "--context-size", - type=int, - default=2, - help="The context size in the decoder. 1 means bigram; 2 means tri-gram", - ) - parser.add_argument( - "--max-sym-per-frame", - type=int, - default=1, - help="""Maximum number of symbols per frame. - Used only when --decoding_method is greedy_search""", - ) - - parser.add_argument( - "--num-paths", - type=int, - default=200, - help="""Number of paths for nbest decoding. - Used only when the decoding method is fast_beam_search_nbest, - fast_beam_search_nbest_LG, and fast_beam_search_nbest_oracle""", - ) - - parser.add_argument( - "--nbest-scale", - type=float, - default=0.5, - help="""Scale applied to lattice scores when computing nbest paths. - Used only when the decoding method is fast_beam_search_nbest, - fast_beam_search_nbest_LG, and fast_beam_search_nbest_oracle""", - ) - - parser.add_argument( - "--simulate-streaming", - type=str2bool, - default=False, - help="""Whether to simulate streaming in decoding, this is a good way to - test a streaming model. - """, - ) - - parser.add_argument( - "--decode-chunk-size", - type=int, - default=16, - help="The chunk size for decoding (in frames after subsampling)", - ) - - parser.add_argument( - "--left-context", - type=int, - default=64, - help="left context can be seen during decoding (in frames after subsampling)", - ) - - parser.add_argument( - "--res-name", - type=str, - ) - - add_model_arguments(parser) - add_rep_arguments(parser) - - return parser - - -def decode_one_batch( - params: AttributeDict, - model: nn.Module, - sp: spm.SentencePieceProcessor, - batch: dict, - word_table: Optional[k2.SymbolTable] = None, - decoding_graph: Optional[k2.Fsa] = None, -) -> Dict[str, List[List[str]]]: - """Decode one batch and return the result in a dict. The dict has the - following format: - - - key: It indicates the setting used for decoding. For example, - if greedy_search is used, it would be "greedy_search" - If beam search with a beam size of 7 is used, it would be - "beam_7" - - value: It contains the decoding result. `len(value)` equals to - batch size. `value[i]` is the decoding result for the i-th - utterance in the given batch. - Args: - params: - It's the return value of :func:`get_params`. - model: - The neural model. - sp: - The BPE model. - batch: - It is the return value from iterating - `lhotse.dataset.K2SpeechRecognitionDataset`. See its documentation - for the format of the `batch`. - word_table: - The word symbol table. - decoding_graph: - The decoding graph. Can be either a `k2.trivial_graph` or HLG, Used - only when --decoding_method is fast_beam_search, fast_beam_search_nbest, - fast_beam_search_nbest_oracle, and fast_beam_search_nbest_LG. - Returns: - Return the decoding result. See above description for the format of - the returned dict. - """ - device = next(model.parameters()).device - feature = batch["inputs"] - assert feature.ndim == 2 or feature.ndim == 3 - - feature = feature.to(device) - # at entry, feature is (N, T, C) - - supervisions = batch["supervisions"] - #feature_lens = supervisions["num_frames"].to(device) - if feature.ndim == 2: - feature_lens = [] - for supervision in supervisions['cut']: - try: feature_lens.append(supervision.tracks[0].cut.recording.num_samples) - except: feature_lens.append(supervision.recording.num_samples) - feature_lens = torch.tensor(feature_lens) - - elif feature.ndim == 3: - feature_lens = supervisions["num_frames"].to(device) - - if params.simulate_streaming: - feature_lens += params.left_context - feature = torch.nn.functional.pad( - feature, - pad=(0, 0, 0, params.left_context), - value=LOG_EPS, - ) - encoder_out, encoder_out_lens, _ = model.encoder.streaming_forward( - x=feature, - x_lens=feature_lens, - chunk_size=params.decode_chunk_size, - left_context=params.left_context, - simulate_streaming=True, - ) - else: - encoder_out, encoder_out_lens = model.encoder(x=feature, x_lens=feature_lens) - - hyps = [] - - if params.decoding_method == "fast_beam_search": - hyp_tokens = fast_beam_search_one_best( - model=model, - decoding_graph=decoding_graph, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - beam=params.beam, - max_contexts=params.max_contexts, - max_states=params.max_states, - ) - for hyp in sp.decode(hyp_tokens): - hyps.append(hyp.split()) - elif params.decoding_method == "fast_beam_search_nbest_LG": - hyp_tokens = fast_beam_search_nbest_LG( - model=model, - decoding_graph=decoding_graph, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - beam=params.beam, - max_contexts=params.max_contexts, - max_states=params.max_states, - num_paths=params.num_paths, - nbest_scale=params.nbest_scale, - ) - for hyp in hyp_tokens: - hyps.append([word_table[i] for i in hyp]) - elif params.decoding_method == "fast_beam_search_nbest": - hyp_tokens = fast_beam_search_nbest( - model=model, - decoding_graph=decoding_graph, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - beam=params.beam, - max_contexts=params.max_contexts, - max_states=params.max_states, - num_paths=params.num_paths, - nbest_scale=params.nbest_scale, - ) - for hyp in sp.decode(hyp_tokens): - hyps.append(hyp.split()) - elif params.decoding_method == "fast_beam_search_nbest_oracle": - hyp_tokens = fast_beam_search_nbest_oracle( - model=model, - decoding_graph=decoding_graph, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - beam=params.beam, - max_contexts=params.max_contexts, - max_states=params.max_states, - num_paths=params.num_paths, - ref_texts=sp.encode(supervisions["text"]), - nbest_scale=params.nbest_scale, - ) - for hyp in sp.decode(hyp_tokens): - hyps.append(hyp.split()) - elif params.decoding_method == "greedy_search" and params.max_sym_per_frame == 1: - hyp_tokens = greedy_search_batch( - model=model, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - ) - for hyp in sp.decode(hyp_tokens): - hyps.append(hyp.split()) - elif params.decoding_method == "modified_beam_search": - hyp_tokens = modified_beam_search( - model=model, - encoder_out=encoder_out, - encoder_out_lens=encoder_out_lens, - beam=params.beam_size, - ) - for hyp in sp.decode(hyp_tokens): - hyps.append(hyp.split()) - else: - batch_size = encoder_out.size(0) - - for i in range(batch_size): - # fmt: off - encoder_out_i = encoder_out[i:i+1, :encoder_out_lens[i]] - # fmt: on - if params.decoding_method == "greedy_search": - hyp = greedy_search( - model=model, - encoder_out=encoder_out_i, - max_sym_per_frame=params.max_sym_per_frame, - ) - elif params.decoding_method == "beam_search": - hyp = beam_search( - model=model, - encoder_out=encoder_out_i, - beam=params.beam_size, - ) - else: - raise ValueError( - f"Unsupported decoding method: {params.decoding_method}" - ) - hyps.append(sp.decode(hyp).split()) - - if params.decoding_method == "greedy_search": - return {"greedy_search": hyps} - elif "fast_beam_search" in params.decoding_method: - key = f"beam_{params.beam}_" - key += f"max_contexts_{params.max_contexts}_" - key += f"max_states_{params.max_states}" - if "nbest" in params.decoding_method: - key += f"_num_paths_{params.num_paths}_" - key += f"nbest_scale_{params.nbest_scale}" - if "LG" in params.decoding_method: - key += f"_ngram_lm_scale_{params.ngram_lm_scale}" - - return {key: hyps} - else: - return {f"beam_size_{params.beam_size}": hyps} - - -def decode_dataset( - dl: torch.utils.data.DataLoader, - params: AttributeDict, - model: nn.Module, - sp: spm.SentencePieceProcessor, - word_table: Optional[k2.SymbolTable] = None, - decoding_graph: Optional[k2.Fsa] = None, -) -> Dict[str, List[Tuple[str, List[str], List[str]]]]: - """Decode dataset. - - Args: - dl: - PyTorch's dataloader containing the dataset to decode. - params: - It is returned by :func:`get_params`. - model: - The neural model. - sp: - The BPE model. - word_table: - The word symbol table. - decoding_graph: - The decoding graph. Can be either a `k2.trivial_graph` or HLG, Used - only when --decoding_method is fast_beam_search, fast_beam_search_nbest, - fast_beam_search_nbest_oracle, and fast_beam_search_nbest_LG. - Returns: - Return a dict, whose key may be "greedy_search" if greedy search - is used, or it may be "beam_7" if beam size of 7 is used. - Its value is a list of tuples. Each tuple contains two elements: - The first is the reference transcript, and the second is the - predicted result. - """ - num_cuts = 0 - - try: - num_batches = len(dl) - except TypeError: - num_batches = "?" - - if params.decoding_method == "greedy_search": - log_interval = 50 - else: - log_interval = 20 - - results = defaultdict(list) - for batch_idx, batch in enumerate(dl): - texts = batch["supervisions"]["text"] - cut_ids = [cut.id for cut in batch["supervisions"]["cut"]] - - hyps_dict = decode_one_batch( - params=params, - model=model, - sp=sp, - decoding_graph=decoding_graph, - word_table=word_table, - batch=batch, - ) - - for name, hyps in hyps_dict.items(): - this_batch = [] - assert len(hyps) == len(texts) - for cut_id, hyp_words, ref_text in zip(cut_ids, hyps, texts): - ref_words = ref_text.split() - this_batch.append((cut_id, ref_words, hyp_words)) - - results[name].extend(this_batch) - - num_cuts += len(texts) - - if batch_idx % log_interval == 0: - batch_str = f"{batch_idx}/{num_batches}" - - logging.info(f"batch {batch_str}, cuts processed until now is {num_cuts}") - return results - - -def save_results( - params: AttributeDict, - test_set_name: str, - results_dict: Dict[str, List[Tuple[str, List[str], List[str]]]], -): - test_set_wers = dict() - for key, results in results_dict.items(): - recog_path = ( - params.res_dir / f"recogs-{test_set_name}-{key}-{params.suffix}.txt" - ) - results = sorted(results) - store_transcripts(filename=recog_path, texts=results) - logging.info(f"The transcripts are stored in {recog_path}") - - # The following prints out WERs, per-word error statistics and aligned - # ref/hyp pairs. - errs_filename = ( - params.res_dir / f"errs-{test_set_name}-{key}-{params.suffix}.txt" - ) - with open(errs_filename, "w") as f: - wer = write_error_stats( - f, f"{test_set_name}-{key}", results, enable_log=True - ) - test_set_wers[key] = wer - - logging.info("Wrote detailed error stats to {}".format(errs_filename)) - - test_set_wers = sorted(test_set_wers.items(), key=lambda x: x[1]) - errs_info = ( - params.res_dir / f"wer-summary-{test_set_name}-{key}-{params.suffix}.txt" - ) - with open(errs_info, "w") as f: - print("settings\tWER", file=f) - for key, val in test_set_wers: - print("{}\t{}".format(key, val), file=f) - - s = "\nFor {}, WER of different settings are:\n".format(test_set_name) - note = "\tbest for {}".format(test_set_name) - for key, val in test_set_wers: - s += "{}\t{}{}\n".format(key, val, note) - note = "" - logging.info(s) - - -@torch.no_grad() -def main(): - parser = get_parser() - LibriSpeechAsrDataModule.add_arguments(parser) - args = parser.parse_args() - args.exp_dir = Path(args.exp_dir) - - params = get_params() - params.update(vars(args)) - - assert params.decoding_method in ( - "greedy_search", - "beam_search", - "fast_beam_search", - "fast_beam_search_nbest", - "fast_beam_search_nbest_LG", - "fast_beam_search_nbest_oracle", - "modified_beam_search", - ) - params.res_dir = params.exp_dir / params.decoding_method - - if params.iter > 0: - params.suffix = f"iter-{params.iter}-avg-{params.avg}" - else: - params.suffix = f"epoch-{params.epoch}-avg-{params.avg}" - - if params.simulate_streaming: - params.suffix += f"-streaming-chunk-size-{params.decode_chunk_size}" - params.suffix += f"-left-context-{params.left_context}" - - if "fast_beam_search" in params.decoding_method: - params.suffix += f"-beam-{params.beam}" - params.suffix += f"-max-contexts-{params.max_contexts}" - params.suffix += f"-max-states-{params.max_states}" - if "nbest" in params.decoding_method: - params.suffix += f"-nbest-scale-{params.nbest_scale}" - params.suffix += f"-num-paths-{params.num_paths}" - if "LG" in params.decoding_method: - params.suffix += f"-ngram-lm-scale-{params.ngram_lm_scale}" - elif "beam_search" in params.decoding_method: - params.suffix += f"-{params.decoding_method}-beam-size-{params.beam_size}" - else: - params.suffix += f"-context-{params.context_size}" - params.suffix += f"-max-sym-per-frame-{params.max_sym_per_frame}" - - if params.use_averaged_model: - params.suffix += "-use-averaged-model" - - setup_logger(f"{params.res_dir}/log-decode-{params.suffix}") - logging.info("Decoding started") - - device = torch.device("cpu") - if torch.cuda.is_available(): - device = torch.device("cuda", 0) - - logging.info(f"Device: {device}") - - sp = spm.SentencePieceProcessor() - sp.load(params.bpe_model) - - # and are defined in local/train_bpe_model.py - params.blank_id = sp.piece_to_id("") - params.unk_id = sp.piece_to_id("") - params.vocab_size = sp.get_piece_size() - - if params.simulate_streaming: - assert ( - params.causal_convolution - ), "Decoding in streaming requires causal convolution" - - logging.info(params) - - logging.info("About to create model") - model = get_transducer_model(params) - - if '.pt' in params.model_name: - load_checkpoint(f"{params.exp_dir}/{params.model_name}", model) - elif 'lora' in params.model_name: - load_checkpoint(f"{params.exp_dir}/../d2v-base-T.pt", model) - - ## for lora hooking - lora_modules = [] - for modules in model.modules(): - if isinstance(modules, fairseq.modules.multihead_attention.MultiheadAttention): - for module in modules.modules(): - if isinstance(module, torch.nn.Linear): - lora_modules.append(LoRAHook(module)) - - for i, lora in enumerate(lora_modules): - lora_param = torch.load(f"{params.exp_dir}/lora_{params.iter}_{i}.pt") - lora.lora.load_state_dict(lora_param) - lora.lora.to(device) - logging.info("lora params load done") - else: - if not params.use_averaged_model: - if params.iter > 0: - filenames = find_checkpoints(params.exp_dir, iteration=-params.iter)[ - : params.avg - ] - if len(filenames) == 0: - raise ValueError( - f"No checkpoints found for" - f" --iter {params.iter}, --avg {params.avg}" - ) - elif len(filenames) < params.avg: - raise ValueError( - f"Not enough checkpoints ({len(filenames)}) found for" - f" --iter {params.iter}, --avg {params.avg}" - ) - logging.info(f"averaging {filenames}") - model.to(device) - model.load_state_dict(average_checkpoints(filenames, device=device)) - elif params.avg == 1: - load_checkpoint(f"{params.exp_dir}/epoch-{params.epoch}.pt", model) - else: - start = params.epoch - params.avg + 1 - filenames = [] - for i in range(start, params.epoch + 1): - if i >= 1: - filenames.append(f"{params.exp_dir}/epoch-{i}.pt") - logging.info(f"averaging {filenames}") - model.to(device) - model.load_state_dict(average_checkpoints(filenames, device=device)) - else: - if params.iter > 0: - filenames = find_checkpoints(params.exp_dir, iteration=-params.iter)[ - : params.avg + 1 - ] - if len(filenames) == 0: - raise ValueError( - f"No checkpoints found for" - f" --iter {params.iter}, --avg {params.avg}" - ) - elif len(filenames) < params.avg + 1: - raise ValueError( - f"Not enough checkpoints ({len(filenames)}) found for" - f" --iter {params.iter}, --avg {params.avg}" - ) - filename_start = filenames[-1] - filename_end = filenames[0] - logging.info( - "Calculating the averaged model over iteration checkpoints" - f" from {filename_start} (excluded) to {filename_end}" - ) - model.to(device) - model.load_state_dict( - average_checkpoints_with_averaged_model( - filename_start=filename_start, - filename_end=filename_end, - device=device, - ) - ) - else: - assert params.avg > 0, params.avg - start = params.epoch - params.avg - assert start >= 1, start - filename_start = f"{params.exp_dir}/epoch-{start}.pt" - filename_end = f"{params.exp_dir}/epoch-{params.epoch}.pt" - logging.info( - f"Calculating the averaged model over epoch range from " - f"{start} (excluded) to {params.epoch}" - ) - model.to(device) - model.load_state_dict( - average_checkpoints_with_averaged_model( - filename_start=filename_start, - filename_end=filename_end, - device=device, - ) - ) - - model.to(device) - model.eval() - - if "fast_beam_search" in params.decoding_method: - if params.decoding_method == "fast_beam_search_nbest_LG": - lexicon = Lexicon(params.lang_dir) - word_table = lexicon.word_table - lg_filename = params.lang_dir / "LG.pt" - logging.info(f"Loading {lg_filename}") - decoding_graph = k2.Fsa.from_dict( - torch.load(lg_filename, map_location=device) - ) - decoding_graph.scores *= params.ngram_lm_scale - else: - word_table = None - decoding_graph = k2.trivial_graph(params.vocab_size - 1, device=device) - else: - decoding_graph = None - word_table = None - - num_param = sum([p.numel() for p in model.parameters()]) - logging.info(f"Number of model parameters: {num_param}") - - # we need cut ids to display recognition results. - args.return_cuts = True - librispeech = LibriSpeechAsrDataModule(args) - - ''' - test_clean_cuts = librispeech.test_clean_cuts() - test_other_cuts = librispeech.test_other_cuts() - - test_clean_dl = librispeech.test_dataloaders(test_clean_cuts) - test_other_dl = librispeech.test_dataloaders(test_other_cuts) - - test_sets = ["test-clean", "test-other"] - test_dl = [test_clean_dl, test_other_dl] - ''' - - test_clean_cuts = librispeech.userlibri_cuts(option=params.spk_id) - test_clean_dl = librispeech.test_dataloaders(test_clean_cuts) - test_sets = [f"{params.spk_id}"] - test_dl = [test_clean_dl] - - for test_set, test_dl in zip(test_sets, test_dl): - results_dict = decode_dataset( - dl=test_dl, - params=params, - model=model, - sp=sp, - word_table=word_table, - decoding_graph=decoding_graph, - ) - - save_results( - params=params, - test_set_name=test_set, - results_dict=results_dict, - ) - - logging.info("Done!") - - -if __name__ == "__main__": - main() diff --git a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/train_uda.py b/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/train_uda.py deleted file mode 100755 index c25ab54aa..000000000 --- a/egs/tedlium3/ASR/pruned_transducer_stateless_d2v_v2/train_uda.py +++ /dev/null @@ -1,1960 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2021-2022 Xiaomi Corp. (authors: Fangjun Kuang, -# Wei Kang, -# Mingshuang Luo,) -# Zengwei Yao) -# -# See ../../../../LICENSE for clarification regarding multiple authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Usage: - -export CUDA_VISIBLE_DEVICES="0,1,2,3" - -./pruned_transducer_stateless7_ctc/train.py \ - --world-size 4 \ - --num-epochs 30 \ - --start-epoch 1 \ - --exp-dir pruned_transducer_stateless7_ctc/exp \ - --full-libri 1 \ - --max-duration 300 - -# For mix precision training: - -./pruned_transducer_stateless7_ctc/train.py \ - --world-size 4 \ - --num-epochs 30 \ - --start-epoch 1 \ - --use-fp16 1 \ - --exp-dir pruned_transducer_stateless7_ctc/exp \ - --full-libri 1 \ - --max-duration 550 - -# For d2v-T training: -export CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7" - -./pruned_transducer_stateless_d2v_v2/train.py \ - --wandb true \ - --input-strategy AudioSamples \ - --enable-spec-aug False \ - --multi-optim True \ - --world-size 8 \ - --num-epochs 30 \ - --start-epoch 1 \ - --full-libri 0 \ - --exp-dir ./pruned_transducer_stateless_d2v_v2/$1 \ - --max-duration 250 \ - --freeze-finetune-updates 2000 \ - --use-fp16 1 \ - --peak-enc-lr 0.001 \ - --peak-dec-lr 0.05 \ - --accum-grads 1 \ - --encoder-type d2v \ - --additional-block True \ - --encoder-dim 768 \ - --decoder-dim 768 \ - --joiner-dim 768 \ - --prune-range 20 \ - --context-size 2 \ - --ctc-loss-scale 0.2 - -""" - - -import random -import argparse -import copy -import logging -import warnings -from pathlib import Path -from shutil import copyfile -from typing import Any, Dict, Optional, Tuple, Union - -import k2 -import optim -import sentencepiece as spm -import torch -import torch.multiprocessing as mp -import torch.nn as nn -from asr_datamodule import LibriSpeechAsrDataModule -from decoder import Decoder -from joiner import Joiner -from lhotse.cut import Cut -from lhotse.dataset.sampling.base import CutSampler -from lhotse.utils import fix_random_seed -from model import Transducer -from optim import Eden, ScaledAdam -from torch import Tensor -from torch.cuda.amp import GradScaler -from torch.nn.parallel import DistributedDataParallel as DDP -from torch.utils.tensorboard import SummaryWriter -from zipformer import Zipformer -from data2vec_encoder import FairSeqData2VecEncoder - -from icefall import diagnostics -from icefall.checkpoint import remove_checkpoints -from icefall.checkpoint import update_averaged_model -from checkpoint import ( - save_checkpoint as save_checkpoint_impl, - save_checkpoint_with_global_batch_idx, - load_checkpoint -) -from icefall.dist import cleanup_dist, setup_dist -from icefall.env import get_env_info -from icefall.hooks import register_inf_check_hooks -from icefall.utils import ( - AttributeDict, - MetricsTracker, - encode_supervisions, - setup_logger, - str2bool, - save_args, -) - -import wandb - -#from icefall.checkpoint import save_checkpoint as save_checkpoint_impl -LRSchedulerType = Union[torch.optim.lr_scheduler._LRScheduler, optim.LRScheduler] - - -def set_batch_count(model: Union[nn.Module, DDP], batch_count: float) -> None: - if isinstance(model, DDP): - # get underlying nn.Module - model = model.module - for module in model.modules(): - if hasattr(module, "batch_count"): - module.batch_count = batch_count - model.encoder.num_updates = int(batch_count) - - -def add_adapter_arguments(parser: argparse.ArgumentParser): - parser.add_argument( - "--add-adapter", - type=str2bool, - default=False, - help="add adapter to rep model's encoder" - ) - - parser.add_argument( - "--adapter-lr", - type=float, - default=0.0001, - help="adapter learning rate" - ) - - parser.add_argument( - "--gender", - type=str, - default='male', - help="select gender" - ) - - -def add_rep_arguments(parser: argparse.ArgumentParser): - parser.add_argument( - "--wandb", - type=str2bool, - default=True, - help="Use wandb for MLOps", - ) - parser.add_argument( - "--hpo", - type=str2bool, - default=False, - help="Use small db for HPO", - ) - - parser.add_argument( - "--accum-grads", - type=int, - default=1, - help="accum-grad num.", - ) - - parser.add_argument( - "--multi-optim", - type=str2bool, - default=True, - help="use sperate optimizer (enc / dec)", - ) - - parser.add_argument( - "--peak-enc-lr", - type=float, - default=0.0001, - help="The initial learning rate. This value should not need to be changed.", - ) - - parser.add_argument( - "--peak-dec-lr", - type=float, - default=0.001, - help="The initial learning rate. This value should not need to be changed.", - ) - - parser.add_argument( - "--encoder-type", - type=str, - default='d2v', - help="Type of encoder (e.g. conformer, w2v, d2v...", - ) - - parser.add_argument( - "--encoder-dim", - type=int, - default=768, - help="encoder embedding dimension", - ) - - parser.add_argument( - "--freeze-finetune-updates", - type=int, - default=0 - ) - - parser.add_argument( - "--additional-block", - type=str2bool, - default=True, - ) - - parser.add_argument( - "--decode-interval", - type=int, - default=200, - help="decode interval", - ) - - -def add_model_arguments(parser: argparse.ArgumentParser): - parser.add_argument( - "--num-encoder-layers", - type=str, - default="2,4,3,2,4", - help="Number of zipformer encoder layers, comma separated.", - ) - - parser.add_argument( - "--feedforward-dims", - type=str, - default="1024,1024,2048,2048,1024", - help="Feedforward dimension of the zipformer encoder layers, comma separated.", - ) - - parser.add_argument( - "--nhead", - type=str, - default="8,8,8,8,8", - help="Number of attention heads in the zipformer encoder layers.", - ) - - parser.add_argument( - "--encoder-dims", - type=str, - default="384,384,384,384,384", - help="Embedding dimension in the 2 blocks of zipformer encoder layers, comma separated", - ) - - parser.add_argument( - "--attention-dims", - type=str, - default="192,192,192,192,192", - help="""Attention dimension in the 2 blocks of zipformer encoder layers, comma separated; - not the same as embedding dimension.""", - ) - - parser.add_argument( - "--encoder-unmasked-dims", - type=str, - default="256,256,256,256,256", - help="Unmasked dimensions in the encoders, relates to augmentation during training. " - "Must be <= each of encoder_dims. Empirically, less than 256 seems to make performance " - " worse.", - ) - - parser.add_argument( - "--zipformer-downsampling-factors", - type=str, - default="1,2,4,8,2", - help="Downsampling factor for each stack of encoder layers.", - ) - - parser.add_argument( - "--cnn-module-kernels", - type=str, - default="31,31,31,31,31", - help="Sizes of kernels in convolution modules", - ) - - parser.add_argument( - "--decoder-dim", - type=int, - default=768, - help="Embedding dimension in the decoder model.", - ) - - parser.add_argument( - "--joiner-dim", - type=int, - default=768, - help="""Dimension used in the joiner model. - Outputs from the encoder and decoder model are projected - to this dimension before adding. - """, - ) - - -def get_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - - parser.add_argument( - "--world-size", - type=int, - default=1, - help="Number of GPUs for DDP training.", - ) - - parser.add_argument( - "--master-port", - type=int, - default=12354, - help="Master port to use for DDP training.", - ) - - parser.add_argument( - "--tensorboard", - type=str2bool, - default=True, - help="Should various information be logged in tensorboard.", - ) - - parser.add_argument( - "--num-epochs", - type=int, - default=30, - help="Number of epochs to train.", - ) - - parser.add_argument( - "--start-epoch", - type=int, - default=1, - help="""Resume training from this epoch. It should be positive. - If larger than 1, it will load checkpoint from - exp-dir/epoch-{start_epoch-1}.pt - """, - ) - - parser.add_argument( - "--start-batch", - type=int, - default=0, - help="""If positive, --start-epoch is ignored and - it loads the checkpoint from exp-dir/checkpoint-{start_batch}.pt - """, - ) - - parser.add_argument( - "--exp-dir", - type=str, - default="pruned_transducer_stateless7_ctc/exp", - help="""The experiment dir. - It specifies the directory where all training related - files, e.g., checkpoints, log, etc, are saved - """, - ) - - parser.add_argument( - "--bpe-model", - type=str, - default="data/lang_bpe_500/bpe.model", - help="Path to the BPE model", - ) - - parser.add_argument( - "--base-lr", type=float, default=0.05, help="The base learning rate." - ) - - parser.add_argument( - "--lr-batches", - type=float, - default=5000, - help="""Number of steps that affects how rapidly the learning rate - decreases. We suggest not to change this.""", - ) - - parser.add_argument( - "--lr-epochs", - type=float, - default=3.5, - help="""Number of epochs that affects how rapidly the learning rate decreases. - """, - ) - - parser.add_argument( - "--context-size", - type=int, - default=2, - help="The context size in the decoder. 1 means bigram; 2 means tri-gram", - ) - - parser.add_argument( - "--prune-range", - type=int, - default=5, - help="The prune range for rnnt loss, it means how many symbols(context)" - "we are using to compute the loss", - ) - - parser.add_argument( - "--lm-scale", - type=float, - default=0.25, - help="The scale to smooth the loss with lm " - "(output of prediction network) part.", - ) - - parser.add_argument( - "--am-scale", - type=float, - default=0.0, - help="The scale to smooth the loss with am (output of encoder network) part.", - ) - - parser.add_argument( - "--simple-loss-scale", - type=float, - default=0.5, - help="To get pruning ranges, we will calculate a simple version" - "loss(joiner is just addition), this simple loss also uses for" - "training (as a regularization item). We will scale the simple loss" - "with this parameter before adding to the final loss.", - ) - - parser.add_argument( - "--ctc-loss-scale", - type=float, - default=0.2, - help="Scale for CTC loss.", - ) - - parser.add_argument( - "--seed", - type=int, - default=42, - help="The seed for random generators intended for reproducibility", - ) - - parser.add_argument( - "--print-diagnostics", - type=str2bool, - default=False, - help="Accumulate stats on activations, print them and exit.", - ) - - parser.add_argument( - "--inf-check", - type=str2bool, - default=False, - help="Add hooks to check for infinite module outputs and gradients.", - ) - - parser.add_argument( - "--save-every-n", - type=int, - default=2000, - help="""Save checkpoint after processing this number of batches" - periodically. We save checkpoint to exp-dir/ whenever - params.batch_idx_train % save_every_n == 0. The checkpoint filename - has the form: f'exp-dir/checkpoint-{params.batch_idx_train}.pt' - Note: It also saves checkpoint to `exp-dir/epoch-xxx.pt` at the - end of each epoch where `xxx` is the epoch number counting from 0. - """, - ) - - parser.add_argument( - "--keep-last-k", - type=int, - default=30, - help="""Only keep this number of checkpoints on disk. - For instance, if it is 3, there are only 3 checkpoints - in the exp-dir with filenames `checkpoint-xxx.pt`. - It does not affect checkpoints with name `epoch-xxx.pt`. - """, - ) - - parser.add_argument( - "--average-period", - type=int, - default=200, - help="""Update the averaged model, namely `model_avg`, after processing - this number of batches. `model_avg` is a separate version of model, - in which each floating-point parameter is the average of all the - parameters from the start of training. Each time we take the average, - we do: `model_avg = model * (average_period / batch_idx_train) + - model_avg * ((batch_idx_train - average_period) / batch_idx_train)`. - """, - ) - - parser.add_argument( - "--use-fp16", - type=str2bool, - default=True, - help="Whether to use half precision training.", - ) - - add_model_arguments(parser) - add_rep_arguments(parser) - add_adapter_arguments(parser) - - return parser - - -def get_params() -> AttributeDict: - """Return a dict containing training parameters. - - All training related parameters that are not passed from the commandline - are saved in the variable `params`. - - Commandline options are merged into `params` after they are parsed, so - you can also access them via `params`. - - Explanation of options saved in `params`: - - - best_train_loss: Best training loss so far. It is used to select - the model that has the lowest training loss. It is - updated during the training. - - - best_valid_loss: Best validation loss so far. It is used to select - the model that has the lowest validation loss. It is - updated during the training. - - - best_train_epoch: It is the epoch that has the best training loss. - - - best_valid_epoch: It is the epoch that has the best validation loss. - - - batch_idx_train: Used to writing statistics to tensorboard. It - contains number of batches trained so far across - epochs. - - - log_interval: Print training loss if batch_idx % log_interval` is 0 - - - reset_interval: Reset statistics if batch_idx % reset_interval is 0 - - - valid_interval: Run validation if batch_idx % valid_interval is 0 - - - feature_dim: The model input dim. It has to match the one used - in computing features. - - - subsampling_factor: The subsampling factor for the model. - - - encoder_dim: Hidden dim for multi-head attention model. - - - num_decoder_layers: Number of decoder layer of transformer decoder. - - - warm_step: The warmup period that dictates the decay of the - scale on "simple" (un-pruned) loss. - """ - params = AttributeDict( - { - "best_train_loss": float("inf"), - "best_valid_loss": float("inf"), - "best_train_epoch": -1, - "best_valid_epoch": -1, - "batch_idx_train": 0, - "log_interval": 50, - "reset_interval": 200, - "valid_interval": 3000, # For the 100h subset, use 800 - # parameters for zipformer - "feature_dim": 80, - "subsampling_factor": 320, # not passed in, this is fixed. - # parameters for ctc loss - "beam_size": 10, - "use_double_scores": True, - "warm_step": 0, - #"warm_step": 4000, - #"warm_step": 3000, - "env_info": get_env_info(), - } - ) - - return params - - -def get_encoder_model(params: AttributeDict) -> nn.Module: - # TODO: We can add an option to switch between Zipformer and Transformer - def to_int_tuple(s: str): - return tuple(map(int, s.split(","))) - - if params.encoder_type == 'd2v': - encoder = FairSeqData2VecEncoder( - input_size=params.encoder_dim, - w2v_url='None', - output_size=params.encoder_dim, - freeze_finetune_updates=params.freeze_finetune_updates, - additional_block=params.additional_block, - ) - else: - encoder = Zipformer( - num_features=params.feature_dim, - output_downsampling_factor=2, - zipformer_downsampling_factors=to_int_tuple( - params.zipformer_downsampling_factors - ), - encoder_dims=to_int_tuple(params.encoder_dims), - attention_dim=to_int_tuple(params.attention_dims), - encoder_unmasked_dims=to_int_tuple(params.encoder_unmasked_dims), - nhead=to_int_tuple(params.nhead), - feedforward_dim=to_int_tuple(params.feedforward_dims), - cnn_module_kernels=to_int_tuple(params.cnn_module_kernels), - num_encoder_layers=to_int_tuple(params.num_encoder_layers), - ) - - return encoder - - -def get_decoder_model(params: AttributeDict) -> nn.Module: - decoder = Decoder( - vocab_size=params.vocab_size, - decoder_dim=params.decoder_dim, - blank_id=params.blank_id, - context_size=params.context_size, - ) - return decoder - - -def get_joiner_model(params: AttributeDict) -> nn.Module: - joiner = Joiner( - encoder_dim=params.encoder_dim if params.encoder_type == 'd2v' else int(params.encoder_dims.split(",")[-1]), - decoder_dim=params.decoder_dim, - joiner_dim=params.joiner_dim, - vocab_size=params.vocab_size, - ) - return joiner - - -def get_transducer_model(params: AttributeDict) -> nn.Module: - encoder = get_encoder_model(params) - decoder = get_decoder_model(params) - joiner = get_joiner_model(params) - - model = Transducer( - encoder=encoder, - decoder=decoder, - joiner=joiner, - encoder_dim=params.encoder_dim if params.encoder_type == 'd2v' else int(params.encoder_dims.split(",")[-1]), - decoder_dim=params.decoder_dim, - joiner_dim=params.joiner_dim, - vocab_size=params.vocab_size, - ) - return model - - -def load_checkpoint_if_available( - params: AttributeDict, - model: nn.Module, - model_avg: nn.Module = None, - optimizer: Optional[torch.optim.Optimizer] = None, - scheduler: Optional[LRSchedulerType] = None, -) -> Optional[Dict[str, Any]]: - """Load checkpoint from file. - - If params.start_batch is positive, it will load the checkpoint from - `params.exp_dir/checkpoint-{params.start_batch}.pt`. Otherwise, if - params.start_epoch is larger than 1, it will load the checkpoint from - `params.start_epoch - 1`. - - Apart from loading state dict for `model` and `optimizer` it also updates - `best_train_epoch`, `best_train_loss`, `best_valid_epoch`, - and `best_valid_loss` in `params`. - - Args: - params: - The return value of :func:`get_params`. - model: - The training model. - model_avg: - The stored model averaged from the start of training. - optimizer: - The optimizer that we are using. - scheduler: - The scheduler that we are using. - Returns: - Return a dict containing previously saved training info. - """ - if params.start_batch > 0: - filename = params.exp_dir / f"checkpoint-{params.start_batch}.pt" - elif params.start_epoch > 1: - filename = params.exp_dir / f"epoch-{params.start_epoch-1}.pt" - elif params.add_adapter: - filename = params.exp_dir / f"../d2v-base-T.pt" - else: - return None - - assert filename.is_file(), f"{filename} does not exist!" - - saved_params = load_checkpoint( - filename, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - strict=True if not params.add_adapter else False, - ) - - keys = [ - "best_train_epoch", - "best_valid_epoch", - "batch_idx_train", - "best_train_loss", - "best_valid_loss", - ] - for k in keys: - params[k] = saved_params[k] - - if params.start_batch > 0: - if "cur_epoch" in saved_params: - params["start_epoch"] = saved_params["cur_epoch"] - - if "cur_batch_idx" in saved_params: - params["cur_batch_idx"] = saved_params["cur_batch_idx"] - - return saved_params - - -def save_checkpoint( - params: AttributeDict, - model: Union[nn.Module, DDP], - model_avg: Optional[nn.Module] = None, - optimizer: Optional[torch.optim.Optimizer] = None, - scheduler: Optional[LRSchedulerType] = None, - sampler: Optional[CutSampler] = None, - scaler: Optional[GradScaler] = None, - rank: int = 0, -) -> None: - """Save model, optimizer, scheduler and training stats to file. - - Args: - params: - It is returned by :func:`get_params`. - model: - The training model. - model_avg: - The stored model averaged from the start of training. - optimizer: - The optimizer used in the training. - sampler: - The sampler for the training dataset. - scaler: - The scaler used for mix precision training. - """ - if rank != 0: - return - filename = params.exp_dir / f"epoch-{params.cur_epoch}.pt" - save_checkpoint_impl( - filename=filename, - model=model, - model_avg=model_avg, - params=params, - optimizer=optimizer, - scheduler=scheduler, - sampler=sampler, - scaler=scaler, - rank=rank, - ) - - if params.best_train_epoch == params.cur_epoch: - best_train_filename = params.exp_dir / "best-train-loss.pt" - copyfile(src=filename, dst=best_train_filename) - - if params.best_valid_epoch == params.cur_epoch: - best_valid_filename = params.exp_dir / "best-valid-loss.pt" - copyfile(src=filename, dst=best_valid_filename) - - -def compute_loss( - params: AttributeDict, - model: Union[nn.Module, DDP], - sp: spm.SentencePieceProcessor, - batch: dict, - is_training: bool, - decode: bool = False, -) -> Tuple[Tensor, MetricsTracker]: - """ - Compute transducer loss given the model and its inputs. - - Args: - params: - Parameters for training. See :func:`get_params`. - model: - The model for training. It is an instance of Zipformer in our case. - batch: - A batch of data. See `lhotse.dataset.K2SpeechRecognitionDataset()` - for the content in it. - is_training: - True for training. False for validation. When it is True, this - function enables autograd during computation; when it is False, it - disables autograd. - warmup: a floating point value which increases throughout training; - values >= 1.0 are fully warmed up and have all modules present. - """ - device = model.device if isinstance(model, DDP) else next(model.parameters()).device - feature = batch["inputs"] - # at entry, feature is (N, T, C) - assert feature.ndim == 2 or feature.ndim == 3 - feature = feature.to(device) - - supervisions = batch["supervisions"] - - if feature.ndim == 2: - feature_lens = [] - for supervision in supervisions['cut']: - try: feature_lens.append(supervision.tracks[0].cut.recording.num_samples) - except: feature_lens.append(supervision.recording.num_samples) - feature_lens = torch.tensor(feature_lens) - - elif feature.ndim == 3: - feature_lens = supervisions["num_frames"].to(device) - - batch_idx_train = params.batch_idx_train - warm_step = params.warm_step - - texts = batch["supervisions"]["text"] - - token_ids = sp.encode(texts, out_type=int) - y = k2.RaggedTensor(token_ids).to(device) - - with torch.set_grad_enabled(is_training): - simple_loss, pruned_loss, ctc_output = model( - x=feature, - x_lens=feature_lens, - y=y, - prune_range=params.prune_range, - am_scale=params.am_scale, - lm_scale=params.lm_scale, - ) - - s = params.simple_loss_scale - # take down the scale on the simple loss from 1.0 at the start - # to params.simple_loss scale by warm_step. - simple_loss_scale = ( - s - if batch_idx_train >= warm_step - else 1.0 - (batch_idx_train / warm_step) * (1.0 - s) - ) - pruned_loss_scale = ( - 1.0 - if batch_idx_train >= warm_step - else 0.1 + 0.9 * (batch_idx_train / warm_step) - ) - - loss = simple_loss_scale * simple_loss + pruned_loss_scale * pruned_loss - - info = MetricsTracker() - - if params.ctc_loss_scale > 0: - # Compute ctc loss - - # NOTE: We need `encode_supervisions` to sort sequences with - # different duration in decreasing order, required by - # `k2.intersect_dense` called in `k2.ctc_loss` - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - supervision_segments, token_ids = encode_supervisions( - supervisions, - subsampling_factor=params.subsampling_factor, - token_ids=token_ids, - ) - - # Works with a BPE model - decoding_graph = k2.ctc_graph(token_ids, modified=False, device=device) - dense_fsa_vec = k2.DenseFsaVec( - ctc_output, - supervision_segments, - allow_truncate=params.subsampling_factor - 1, - ) - - ctc_loss = k2.ctc_loss( - decoding_graph=decoding_graph, - dense_fsa_vec=dense_fsa_vec, - output_beam=params.beam_size, - reduction="sum", - use_double_scores=params.use_double_scores, - ) - assert ctc_loss.requires_grad == is_training - loss += params.ctc_loss_scale * ctc_loss - - info["ctc_loss"] = ctc_loss.detach().cpu().item() - - assert loss.requires_grad == is_training - - if decode: - model.eval() - with torch.no_grad(): - hypos = model.module.decode( - x=feature, - x_lens=feature_lens, - y=y, - sp=sp - ) - logging.info(f'ref: {batch["supervisions"]["text"][0]}') - logging.info(f'hyp: {" ".join(hypos[0])}') - model.train() - - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - info["frames"] = (feature_lens // params.subsampling_factor).sum().item() - - # Note: We use reduction=sum while computing the loss. - info["utterances"] = feature.size(0) - info["loss"] = loss.detach().cpu().item() - info["simple_loss"] = simple_loss.detach().cpu().item() - info["pruned_loss"] = pruned_loss.detach().cpu().item() - - return loss, info - - -def compute_validation_loss( - params: AttributeDict, - model: Union[nn.Module, DDP], - sp: spm.SentencePieceProcessor, - valid_dl: torch.utils.data.DataLoader, - world_size: int = 1, -) -> MetricsTracker: - """Run the validation process.""" - model.eval() - - tot_loss = MetricsTracker() - - for batch_idx, batch in enumerate(valid_dl): - loss, loss_info = compute_loss( - params=params, - model=model, - sp=sp, - batch=batch, - is_training=False, - ) - assert loss.requires_grad is False - tot_loss = tot_loss + loss_info - - if world_size > 1: - tot_loss.reduce(loss.device) - - loss_value = tot_loss["loss"] / tot_loss["utterances"] - if loss_value < params.best_valid_loss: - params.best_valid_epoch = params.cur_epoch - params.best_valid_loss = loss_value - - return tot_loss - - -def train_one_epoch( - params: AttributeDict, - model: Union[nn.Module, DDP], - optimizer: torch.optim.Optimizer or [torch.optim.Optimizer, torch.optim.Optimizer], - scheduler: LRSchedulerType or [LRSchedulerType, LRSchedulerType], - sp: spm.SentencePieceProcessor, - train_dl: torch.utils.data.DataLoader or [torch.utils.data.DataLoader, torch.utils.data.DataLoader], - valid_dl: torch.utils.data.DataLoader, - scaler: GradScaler, - model_avg: Optional[nn.Module] = None, - tb_writer: Optional[SummaryWriter] = None, - world_size: int = 1, - rank: int = 0, - wb = None, -) -> None: - """Train the model for one epoch. - - The training loss from the mean of all frames is saved in - `params.train_loss`. It runs the validation process every - `params.valid_interval` batches. - - Args: - params: - It is returned by :func:`get_params`. - model: - The model for training. - optimizer: - The optimizer we are using. - scheduler: - The learning rate scheduler, we call step() every step. - train_dl: - Dataloader for the training dataset. - valid_dl: - Dataloader for the validation dataset. - scaler: - The scaler used for mix precision training. - model_avg: - The stored model averaged from the start of training. - tb_writer: - Writer to write log messages to tensorboard. - world_size: - Number of nodes in DDP training. If it is 1, DDP is disabled. - rank: - The rank of the node in DDP training. If no DDP is used, it should - be set to 0. - """ - model.train() - - tot_loss = MetricsTracker() - - cur_batch_idx = params.get("cur_batch_idx", 0) - - if params.multi_optim: - optimizer_enc, optimizer_dec = optimizer[0], optimizer[1] - scheduler_enc, scheduler_dec = scheduler[0], scheduler[1] - - if type(train_dl) == list: - train_dl_uda = train_dl[1] - train_dl = train_dl[0] - #for batch_idx, batch in enumerate(train_dl): - for batch_idx, batch in enumerate(zip(train_dl, train_dl_uda)): - if batch_idx < cur_batch_idx: - continue - cur_batch_idx = batch_idx - - if batch_idx % params.accum_grads == 0: params.batch_idx_train += 1 - batch_size = len(batch["supervisions"]["text"]) - - try: - with torch.cuda.amp.autocast(enabled=params.use_fp16): - loss, loss_info = compute_loss( - params=params, - model=model, - sp=sp, - batch=batch, - is_training=True, - decode = True if batch_idx % params.decode_interval == 0 else False, - ) - loss_info.reduce(loss.device) - - numel = params.world_size / (params.accum_grads * loss_info["utterances"]) - loss *= numel ## normalize loss over utts(batch size) - - # summary stats - tot_loss = (tot_loss * (1 - 1 / params.reset_interval)) + loss_info - - # NOTE: We use reduction==sum and loss is computed over utterances - # in the batch and there is no normalization to it so far. - scaler.scale(loss).backward() - - if params.multi_optim and (batch_idx+1) % params.accum_grads == 0: - set_batch_count(model, params.batch_idx_train) - scheduler_enc.step_batch(params.batch_idx_train) - scheduler_dec.step_batch(params.batch_idx_train) - scaler.step(optimizer_enc) - scaler.step(optimizer_dec) - scaler.update() - optimizer_enc.zero_grad() - optimizer_dec.zero_grad() - elif not params.multi_optim and (batch_idx+1) % params.accum_grads == 0: - set_batch_count(model, params.batch_idx_train) - scheduler.step_batch(params.batch_idx_train) - scaler.step(optimizer) - scaler.update() - optimizer.zero_grad() - - except: # noqa - display_and_save_batch(batch, params=params, sp=sp) - raise - - if params.print_diagnostics and batch_idx == 5: - return - - if ( - rank == 0 - and params.batch_idx_train > 0 - and params.batch_idx_train % params.average_period == 0 - ): - update_averaged_model( - params=params, - model_cur=model, - model_avg=model_avg, - ) - - if ( - params.batch_idx_train > 0 - and params.batch_idx_train % params.save_every_n == 0 - ): - params.cur_batch_idx = batch_idx - save_checkpoint_with_global_batch_idx( - out_dir=params.exp_dir, - global_batch_idx=params.batch_idx_train, - model=model, - model_avg=model_avg, - params=params, - optimizer=optimizer, - scheduler=scheduler, - sampler=train_dl.sampler, - scaler=scaler, - rank=rank, - ) - del params.cur_batch_idx - remove_checkpoints( - out_dir=params.exp_dir, - topk=params.keep_last_k, - rank=rank, - ) - - if batch_idx % 100 == 0 and params.use_fp16: - # If the grad scale was less than 1, try increasing it. The _growth_interval - # of the grad scaler is configurable, but we can't configure it to have different - # behavior depending on the current grad scale. - cur_grad_scale = scaler._scale.item() - ''' - if cur_grad_scale < 1.0 or (cur_grad_scale < 8.0 and batch_idx % 400 == 0): - scaler.update(cur_grad_scale * 2.0) - ''' - if cur_grad_scale < 0.01: - logging.warning(f"Grad scale is small: {cur_grad_scale}") - if cur_grad_scale < 1.0e-05: - wb.log({"valid/loss": 10000}) - raise RuntimeError( - f"grad_scale is too small, exiting: {cur_grad_scale}" - ) - - #if params.batch_idx_train > 4000 and loss > 300 and params.wandb: - # wb.log({"valid/loss": 10000}) - # raise RuntimeError( - # f"divergence... exiting: loss={loss}" - # ) - - if batch_idx % (params.log_interval*params.accum_grads) == 0: - #for n, p in model.named_parameters(): - # if 'adapter' in n: - # print(p) - if params.multi_optim: - cur_enc_lr = scheduler_enc.get_last_lr()[0] - cur_dec_lr = scheduler_dec.get_last_lr()[0] - cur_grad_scale = scaler._scale.item() if params.use_fp16 else 1.0 - - logging.info( - f"Epoch {params.cur_epoch}, " - f"batch {batch_idx}, loss[{loss_info}], " - f"tot_loss[{tot_loss}], batch size: {batch_size}, " - f"enc_lr: {cur_enc_lr:.2e}, " - f"dec_lr: {cur_dec_lr:.2e}, " - + (f"grad_scale: {scaler._scale.item()}" if params.use_fp16 else "") - ) - - else: - cur_lr = scheduler.get_last_lr()[0] - cur_grad_scale = scaler._scale.item() if params.use_fp16 else 1.0 - - logging.info( - f"Epoch {params.cur_epoch}, " - f"batch {batch_idx}, loss[{loss_info}], " - f"tot_loss[{tot_loss}], batch size: {batch_size}, " - f"lr: {cur_lr:.2e}, " - + (f"grad_scale: {scaler._scale.item()}" if params.use_fp16 else "") - ) - - if tb_writer is not None: - if params.multi_optim: - tb_writer.add_scalar( - "train/enc_learning_rate", cur_enc_lr, params.batch_idx_train - ) - tb_writer.add_scalar( - "train/dec_learning_rate", cur_dec_lr, params.batch_idx_train - ) - - else: - tb_writer.add_scalar( - "train/learning_rate", cur_lr, params.batch_idx_train - ) - - loss_info.write_summary( - tb_writer, "train/current_", params.batch_idx_train - ) - tot_loss.write_summary(tb_writer, "train/tot_", params.batch_idx_train) - if params.use_fp16: - tb_writer.add_scalar( - "train/grad_scale", - cur_grad_scale, - params.batch_idx_train, - ) - - if wb is not None and rank == 0: - wb.log({"train/loss": loss_info["loss"]*numel}) - wb.log({"train/simple_loss": loss_info["simple_loss"]*numel}) - wb.log({"train/pruned_loss": loss_info["pruned_loss"]*numel}) - wb.log({"train/ctc_loss": loss_info["ctc_loss"]*numel}) - - ''' - logging.info("Computing validation loss") - valid_info = compute_validation_loss( - params=params, - model=model, - sp=sp, - valid_dl=valid_dl, - world_size=world_size, - ) - model.train() - logging.info(f"Epoch {params.cur_epoch}, validation: {valid_info}") - logging.info( - f"Maximum memory allocated so far is {torch.cuda.max_memory_allocated()//1000000}MB" - ) - if tb_writer is not None: - valid_info.write_summary( - tb_writer, "train/valid_", params.batch_idx_train - ) - - if wb is not None and rank == 0: - numel = 1 / (params.accum_grads * valid_info["utterances"]) - #wb.log({"valid/loss": valid_info["loss"]*numel}) - wb.log({"valid/loss": numel*(valid_info["simple_loss"] - +valid_info["pruned_loss"] - +valid_info["ctc_loss"] - )}) - wb.log({"valid/simple_loss": valid_info["simple_loss"]*numel}) - wb.log({"valid/pruned_loss": valid_info["pruned_loss"]*numel}) - wb.log({"valid/ctc_loss": valid_info["ctc_loss"]*numel}) - ''' - loss_value = tot_loss["loss"] / tot_loss["utterances"] - params.train_loss = loss_value - if params.train_loss < params.best_train_loss: - params.best_train_epoch = params.cur_epoch - params.best_train_loss = params.train_loss - - -def run(rank, world_size, args, wb=None): - """ - Args: - rank: - It is a value between 0 and `world_size-1`, which is - passed automatically by `mp.spawn()` in :func:`main`. - The node with rank 0 is responsible for saving checkpoint. - world_size: - Number of GPUs for DDP training. - args: - The return value of get_parser().parse_args() - """ - params = get_params() - params.update(vars(args)) - #params.warm_step *= params.accum_grads - - fix_random_seed(params.seed) - if world_size > 1: - setup_dist(rank, world_size, params.master_port) - - setup_logger(f"{params.exp_dir}/log/log-train") - logging.info("Training started") - - if args.tensorboard and rank == 0: - tb_writer = SummaryWriter(log_dir=f"{params.exp_dir}/tensorboard") - else: - tb_writer = None - - device = torch.device("cpu") - if torch.cuda.is_available(): - device = torch.device("cuda", rank) - logging.info(f"Device: {device}") - - sp = spm.SentencePieceProcessor() - sp.load(params.bpe_model) - - # is defined in local/train_bpe_model.py - params.blank_id = sp.piece_to_id("") - params.vocab_size = sp.get_piece_size() - - logging.info(params) - - logging.info("About to create model") - model = get_transducer_model(params) - logging.info(model) - - num_param = sum([p.numel() for p in model.parameters()]) - logging.info(f"Number of model parameters: {num_param}") - - assert params.save_every_n >= params.average_period - model_avg: Optional[nn.Module] = None - if rank == 0: - # model_avg is only used with rank 0 - model_avg = copy.deepcopy(model).to(torch.float64) - - assert params.start_epoch > 0, params.start_epoch - checkpoints = load_checkpoint_if_available( - params=params, model=model, model_avg=model_avg - ) - - model.to(device) - if world_size > 1: - logging.info("Using DDP") - model = DDP(model, device_ids=[rank], find_unused_parameters=True) - - if params.multi_optim: - logging.info("Using seperate optimizers over encoder, decoder ...") - - enc_param = [] - enc_names = [] - - dec_names = [] - dec_param = [] - - for n, p in model.named_parameters(): - name = n.split('.')[1] - if name == 'encoder' and 'feature_extractor' not in n: - enc_names.append(n) - enc_param.append(p) - elif 'ctc_output' in n: - enc_names.append(n) - enc_param.append(p) - elif 'feature_extractor' not in n: - dec_names.append(n) - dec_param.append(p) - - optimizer_enc = ScaledAdam( - enc_param, - lr=params.peak_enc_lr, - clipping_scale=None, - parameters_names=[enc_names], - ) - optimizer_dec = ScaledAdam( - dec_param, - lr=params.peak_dec_lr, - clipping_scale=5.0, - parameters_names=[dec_names], - ) - - scheduler_enc = Eden(optimizer_enc, params.lr_batches, params.lr_epochs) - scheduler_dec = Eden(optimizer_dec, params.lr_batches, params.lr_epochs) - optimizer = [optimizer_enc, optimizer_dec] - scheduler = [scheduler_enc, scheduler_dec] - - else: - parameters_names = [] - parameters_names.append( - [name_param_pair[0] for name_param_pair in model.named_parameters()] - ) - - logging.info(f"len name = {len(parameters_names)}") - logging.info(f"len param = {len(list(model.parameters()))}") - - optimizer = ScaledAdam( - model.parameters(), - lr=params.base_lr, - clipping_scale=2.0, - parameters_names=parameters_names, - ) - - scheduler = Eden(optimizer, params.lr_batches, params.lr_epochs) - - if checkpoints and ("optimizer" in checkpoints or "optimizer_enc" in checkpoints): - if params.multi_optim: - logging.info("Loading optimizer state dict") - optimizer_enc.load_state_dict(checkpoints["optimizer_enc"]) - optimizer_dec.load_state_dict(checkpoints["optimizer_dec"]) - - else: - logging.info("Loading optimizer state dict") - optimizer.load_state_dict(checkpoints["optimizer"]) - - if checkpoints: - if ( - params.multi_optim - and "scheduler_enc" in checkpoints - and checkpoints["scheduler_enc"] is not None - ): - logging.info("Loading enc/dec scheduler state dict") - scheduler_enc.load_state_dict(checkpoints["scheduler_enc"]) - scheduler_dec.load_state_dict(checkpoints["scheduler_dec"]) - else: - logging.info("Loading scheduler state dict") - scheduler.load_state_dict(checkpoints["scheduler"]) - - if params.print_diagnostics: - opts = diagnostics.TensorDiagnosticOptions( - 2**22 - ) # allow 4 megabytes per sub-module - diagnostic = diagnostics.attach_diagnostics(model, opts) - - if params.inf_check: - register_inf_check_hooks(model) - - librispeech = LibriSpeechAsrDataModule(args) - - train_cuts = librispeech.train_clean_100_cuts() - if params.full_libri: - train_cuts += librispeech.train_clean_360_cuts() - train_cuts += librispeech.train_other_500_cuts() - - def remove_short_and_long_utt(c: Cut): - # Keep only utterances with duration between 1 second and 20 seconds - # - # Caution: There is a reason to select 20.0 here. Please see - # ../local/display_manifest_statistics.py - # - # You should use ../local/display_manifest_statistics.py to get - # an utterance duration distribution for your dataset to select - # the threshold - return 1.0 <= c.duration <= 20.0 - - train_cuts = train_cuts.filter(remove_short_and_long_utt) - - if params.start_batch > 0 and checkpoints and "sampler" in checkpoints: - # We only load the sampler's state dict when it loads a checkpoint - # saved in the middle of an epoch - sampler_state_dict = checkpoints["sampler"] - else: - sampler_state_dict = None - - train_dl = librispeech.train_dataloaders( - train_cuts, sampler_state_dict=sampler_state_dict - ) - - valid_cuts = librispeech.dev_clean_cuts() - valid_cuts += librispeech.dev_other_cuts() - valid_dl = librispeech.valid_dataloaders(valid_cuts) - - ''' - if not params.print_diagnostics: - scan_pessimistic_batches_for_oom( - model=model, - train_dl=train_dl, - optimizer=optimizer, - sp=sp, - params=params, - ) - ''' - - scaler = GradScaler(enabled=params.use_fp16, init_scale=1.0) - if checkpoints and "grad_scaler" in checkpoints: - logging.info("Loading grad scaler state dict") - scaler.load_state_dict(checkpoints["grad_scaler"]) - - for epoch in range(params.start_epoch, params.num_epochs + 1): - if params.multi_optim: - scheduler_enc.step_epoch(epoch - 1) - scheduler_dec.step_epoch(epoch - 1) - else: - scheduler.step_epoch(epoch - 1) - fix_random_seed(params.seed + epoch - 1) - train_dl.sampler.set_epoch(epoch - 1) - - if tb_writer is not None: - tb_writer.add_scalar("train/epoch", epoch, params.batch_idx_train) - - params.cur_epoch = epoch - - train_one_epoch( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sp=sp, - train_dl=train_dl, - valid_dl=valid_dl, - scaler=scaler, - tb_writer=tb_writer, - world_size=world_size, - rank=rank, - wb=wb, - ) - - if params.print_diagnostics: - diagnostic.print_diagnostics() - break - - if epoch % 10 == 0: - save_checkpoint( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sampler=train_dl.sampler, - scaler=scaler, - rank=rank, - ) - - logging.info("Done!") - - if world_size > 1: - torch.distributed.barrier() - cleanup_dist() - - -def run_adapter(rank, world_size, args, wb=None): - """ - Args: - rank: - It is a value between 0 and `world_size-1`, which is - passed automatically by `mp.spawn()` in :func:`main`. - The node with rank 0 is responsible for saving checkpoint. - world_size: - Number of GPUs for DDP training. - args: - The return value of get_parser().parse_args() - """ - params = get_params() - params.update(vars(args)) - - fix_random_seed(params.seed) - if world_size > 1: - setup_dist(rank, world_size, params.master_port) - - setup_logger(f"{params.exp_dir}/log/log-train") - logging.info("Training started") - - if args.tensorboard and rank == 0: - tb_writer = SummaryWriter(log_dir=f"{params.exp_dir}/tensorboard") - else: - tb_writer = None - - device = torch.device("cpu") - if torch.cuda.is_available(): - device = torch.device("cuda", rank) - logging.info(f"Device: {device}") - - sp = spm.SentencePieceProcessor() - sp.load(params.bpe_model) - - # is defined in local/train_bpe_model.py - params.blank_id = sp.piece_to_id("") - params.vocab_size = sp.get_piece_size() - - logging.info(params) - - logging.info("About to create model") - model = get_transducer_model(params) - - num_param = sum([p.numel() if p.requires_grad else 0 for p in model.parameters()]) - logging.info(f"Number of model parameters: {num_param}") - - assert params.save_every_n >= params.average_period - model_avg: Optional[nn.Module] = None - if rank == 0: - # model_avg is only used with rank 0 - model_avg = copy.deepcopy(model).to(torch.float64) - - assert params.start_epoch > 0, params.start_epoch - checkpoints = load_checkpoint_if_available( - params=params, model=model, model_avg=model_avg - ) - - model.to(device) - if world_size > 1: - logging.info("Using DDP") - model = DDP(model, device_ids=[rank], find_unused_parameters=True) - - adapter_names = [] - adapter_param = [] - for n, p in model.named_parameters(): - if 'adapters' in n:# or 'joiner' in n or 'simple' in n or 'ctc' in n: - adapter_names.append(n) - adapter_param.append(p) - elif 'joiner' in n or 'simple' in n or 'ctc' in n: - p.requires_grad = True - else: - p.requires_grad = False - - optimizer_adapter = ScaledAdam( - adapter_param, - lr=params.adapter_lr, - clipping_scale=5.0, - parameters_names=[adapter_names], - ) - scheduler_adapter = Eden(optimizer_adapter, 10000, 7) #params.lr_batche, params.lr_epochs) - - optimizer, scheduler = optimizer_adapter, scheduler_adapter - - librispeech = LibriSpeechAsrDataModule(args) - - ''' - if params.hpo: - train_cuts = librispeech.train_clean_10_cuts(option=params.gender) - else: - train_cuts = librispeech.train_clean_100_cuts(option=params.gender) - if params.full_libri: - train_cuts += librispeech.train_clean_360_cuts(option=params.gender) - train_cuts += librispeech.train_other_500_cuts(option=params.gender) - ''' - - #train_cuts = librispeech.train_clean_10_cuts(option='male') - #train_cuts = librispeech.test_clean_user(option='big') - train_cuts = librispeech.vox_cuts(option=params.spk_id) - - def remove_short_and_long_utt(c: Cut): - return 1.0 <= c.duration <= 20.0 - - train_cuts = train_cuts.filter(remove_short_and_long_utt) - - sampler_state_dict = None - - train_dl = librispeech.train_dataloaders( - train_cuts, sampler_state_dict=sampler_state_dict - ) - #train_dl = librispeech.test_dataloaders( - # train_cuts - #) - - ''' - print('\n'*5) - print('-'*30) - for batch in train_dl: - print(batch) - print('-'*30) - print('\n'*5) - exit() - ''' - - valid_cuts = librispeech.dev_clean_cuts(option=params.gender) - valid_cuts += librispeech.dev_other_cuts(option=params.gender) - valid_dl = librispeech.valid_dataloaders(valid_cuts) - - scaler = GradScaler(enabled=params.use_fp16, init_scale=1.0) - - for epoch in range(params.start_epoch, params.num_epochs + 1): - scheduler.step_epoch(epoch - 1) - fix_random_seed(params.seed + epoch - 1) - train_dl.sampler.set_epoch(epoch - 1) - - if tb_writer is not None: - tb_writer.add_scalar("train/epoch", epoch, params.batch_idx_train) - - params.cur_epoch = epoch - - train_one_epoch( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sp=sp, - train_dl=train_dl, - valid_dl=valid_dl, - scaler=scaler, - tb_writer=tb_writer, - world_size=world_size, - rank=rank, - wb=wb, - ) - - if params.print_diagnostics: - diagnostic.print_diagnostics() - break - - if epoch % 10 == 0: - save_checkpoint( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sampler=train_dl.sampler, - scaler=scaler, - rank=rank, - ) - - logging.info("Done!") - - if world_size > 1: - torch.distributed.barrier() - cleanup_dist() - - -def run_adapter_uda(rank, world_size, args, wb=None): - """ - Args: - rank: - It is a value between 0 and `world_size-1`, which is - passed automatically by `mp.spawn()` in :func:`main`. - The node with rank 0 is responsible for saving checkpoint. - world_size: - Number of GPUs for DDP training. - args: - The return value of get_parser().parse_args() - """ - params = get_params() - params.update(vars(args)) - - fix_random_seed(params.seed) - if world_size > 1: - setup_dist(rank, world_size, params.master_port) - - setup_logger(f"{params.exp_dir}/log/log-train") - logging.info("Training started") - - if args.tensorboard and rank == 0: - tb_writer = SummaryWriter(log_dir=f"{params.exp_dir}/tensorboard") - else: - tb_writer = None - - device = torch.device("cpu") - if torch.cuda.is_available(): - device = torch.device("cuda", rank) - logging.info(f"Device: {device}") - - sp = spm.SentencePieceProcessor() - sp.load(params.bpe_model) - - # is defined in local/train_bpe_model.py - params.blank_id = sp.piece_to_id("") - params.vocab_size = sp.get_piece_size() - - logging.info(params) - - logging.info("About to create model") - model = get_transducer_model(params) - - num_param = sum([p.numel() if p.requires_grad else 0 for p in model.parameters()]) - logging.info(f"Number of model parameters: {num_param}") - - assert params.save_every_n >= params.average_period - model_avg: Optional[nn.Module] = None - if rank == 0: - # model_avg is only used with rank 0 - model_avg = copy.deepcopy(model).to(torch.float64) - - assert params.start_epoch > 0, params.start_epoch - checkpoints = load_checkpoint_if_available( - params=params, model=model, model_avg=model_avg - ) - - model.to(device) - if world_size > 1: - logging.info("Using DDP") - model = DDP(model, device_ids=[rank], find_unused_parameters=True) - - adapter_names = [] - adapter_param = [] - for n, p in model.named_parameters(): - if 'adapters' in n:# or 'joiner' in n or 'simple' in n or 'ctc' in n: - adapter_names.append(n) - adapter_param.append(p) - elif 'joiner' in n or 'simple' in n or 'ctc' in n: - p.requires_grad = True - else: - p.requires_grad = False - - optimizer_adapter = ScaledAdam( - adapter_param, - lr=params.adapter_lr, - clipping_scale=5.0, - parameters_names=[adapter_names], - ) - scheduler_adapter = Eden(optimizer_adapter, 10000, 7) #params.lr_batche, params.lr_epochs) - - optimizer, scheduler = optimizer_adapter, scheduler_adapter - - librispeech = LibriSpeechAsrDataModule(args) - librispeech_uda = LibriSpeechAsrDataModule(args) - - ''' - if params.hpo: - train_cuts = librispeech.train_clean_10_cuts(option=params.gender) - else: - train_cuts = librispeech.train_clean_100_cuts(option=params.gender) - if params.full_libri: - train_cuts += librispeech.train_clean_360_cuts(option=params.gender) - train_cuts += librispeech.train_other_500_cuts(option=params.gender) - ''' - - #train_cuts = librispeech.train_clean_10_cuts(option='male') - #train_cuts = librispeech.test_clean_user(option='big') - train_cuts = librispeech.vox_cuts(option=params.spk_id) - train_cuts_uda = librispeech_uda.vox_cuts(option=params.spk_id) - - def remove_short_and_long_utt(c: Cut): - return 1.0 <= c.duration <= 20.0 - - train_cuts = train_cuts.filter(remove_short_and_long_utt) - train_cuts_uda = train_cuts_uda.filter(remove_short_and_long_utt) - - sampler_state_dict = None - - train_dl = librispeech.train_dataloaders( - train_cuts, sampler_state_dict=sampler_state_dict - ) - train_dl_uda = librispeech.train_dataloaders( - train_cuts_uda, sampler_state_dict=sampler_state_dict - ) - - #train_dl = librispeech.test_dataloaders( - # train_cuts - #) - - ''' - print('\n'*5) - print('-'*30) - for batch in train_dl: - print(batch) - print('-'*30) - print('\n'*5) - exit() - ''' - - valid_cuts = librispeech.dev_clean_cuts(option=params.gender) - valid_cuts += librispeech.dev_other_cuts(option=params.gender) - valid_dl = librispeech.valid_dataloaders(valid_cuts) - - scaler = GradScaler(enabled=params.use_fp16, init_scale=1.0) - - for epoch in range(params.start_epoch, params.num_epochs + 1): - scheduler.step_epoch(epoch - 1) - fix_random_seed(params.seed + epoch - 1) - train_dl.sampler.set_epoch(epoch - 1) - - if tb_writer is not None: - tb_writer.add_scalar("train/epoch", epoch, params.batch_idx_train) - - params.cur_epoch = epoch - - train_one_epoch( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sp=sp, - train_dl=[train_dl, train_dl_uda], - valid_dl=valid_dl, - scaler=scaler, - tb_writer=tb_writer, - world_size=world_size, - rank=rank, - wb=wb, - ) - - if params.print_diagnostics: - diagnostic.print_diagnostics() - break - - if epoch % 10 == 0: - save_checkpoint( - params=params, - model=model, - model_avg=model_avg, - optimizer=optimizer, - scheduler=scheduler, - sampler=train_dl.sampler, - scaler=scaler, - rank=rank, - ) - - logging.info("Done!") - - if world_size > 1: - torch.distributed.barrier() - cleanup_dist() - - - -def display_and_save_batch( - batch: dict, - params: AttributeDict, - sp: spm.SentencePieceProcessor, -) -> None: - """Display the batch statistics and save the batch into disk. - - Args: - batch: - A batch of data. See `lhotse.dataset.K2SpeechRecognitionDataset()` - for the content in it. - params: - Parameters for training. See :func:`get_params`. - sp: - The BPE model. - """ - from lhotse.utils import uuid4 - - filename = f"{params.exp_dir}/batch-{uuid4()}.pt" - logging.info(f"Saving batch to {filename}") - torch.save(batch, filename) - - supervisions = batch["supervisions"] - features = batch["inputs"] - - logging.info(f"features shape: {features.shape}") - - y = sp.encode(supervisions["text"], out_type=int) - num_tokens = sum(len(i) for i in y) - logging.info(f"num tokens: {num_tokens}") - - -def scan_pessimistic_batches_for_oom( - model: Union[nn.Module, DDP], - train_dl: torch.utils.data.DataLoader, - optimizer: torch.optim.Optimizer, - sp: spm.SentencePieceProcessor, - params: AttributeDict, -): - from lhotse.dataset import find_pessimistic_batches - - logging.info( - "Sanity check -- see if any of the batches in epoch 1 would cause OOM." - ) - batches, crit_values = find_pessimistic_batches(train_dl.sampler) - for criterion, cuts in batches.items(): - batch = train_dl.dataset[cuts] - try: - with torch.cuda.amp.autocast(enabled=params.use_fp16): - loss, _ = compute_loss( - params=params, - model=model, - sp=sp, - batch=batch, - is_training=True, - ) - loss.backward() - optimizer.zero_grad() - except Exception as e: - if "CUDA out of memory" in str(e): - logging.error( - "Your GPU ran out of memory with the current " - "max_duration setting. We recommend decreasing " - "max_duration and trying again.\n" - f"Failing criterion: {criterion} " - f"(={crit_values[criterion]}) ..." - ) - display_and_save_batch(batch, params=params, sp=sp) - raise - logging.info( - f"Maximum memory allocated so far is {torch.cuda.max_memory_allocated()//1000000}MB" - ) - - -def main(): - parser = get_parser() - LibriSpeechAsrDataModule.add_arguments(parser) - args = parser.parse_args() - if args.wandb: args.exp_dir = args.exp_dir + str(random.randint(0,400)) - args.exp_dir = Path(args.exp_dir) - - logging.info("save arguments to config.yaml...") - save_args(args) - - if args.wandb: wb = wandb.init(project="d2v-adapter", entity="dohe0342", config=vars(args)) - else: wb = None - - world_size = args.world_size - assert world_size >= 1 - if world_size > 1: - mp.spawn(run if not args.add_adapter else run_adapter, - args=(world_size, args, wb), - nprocs=world_size, - join=True - ) - else: - if not args.add_adapter: run(rank=0, world_size=1, args=args, wb=wb) - else: run(rank=0, world_size=1, args=args, wb=wb) - -torch.set_num_threads(1) -torch.set_num_interop_threads(1) - -if __name__ == "__main__": - main()