icefall/egs/LJSpeech/ASR/local/prepare_userlibri.py
2023-01-19 11:27:19 +09:00

255 lines
9.7 KiB
Python
Executable File

import logging
import os
import re
import shutil
import tarfile
import zipfile
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union
from tqdm.auto import tqdm
from lhotse import validate_recordings_and_supervisions
from lhotse.audio import Recording, RecordingSet
from lhotse.recipes.utils import manifests_exist, read_manifests_if_cached
from lhotse.supervision import AlignmentItem, SupervisionSegment, SupervisionSet
from lhotse.utils import (
Pathlike,
is_module_available,
safe_extract,
urlretrieve_progress,
)
# LIBRISPEECH_ALIGNMENTS_URL = (
# "https://drive.google.com/uc?id=1WYfgr31T-PPwMcxuAq09XZfHQO5Mw8fE"
# )
# def download_librispeech(
# target_dir: Pathlike = ".",
# dataset_parts: Optional[Union[str, Sequence[str]]] = "mini_librispeech",
# force_download: bool = False,
# alignments: bool = False,
# base_url: str = "http://www.openslr.org/resources",
# alignments_url: str = LIBRISPEECH_ALIGNMENTS_URL,
# ) -> Path:
# """
# Download and untar the dataset, supporting both LibriSpeech and MiniLibrispeech
# :param target_dir: Pathlike, the path of the dir to storage the dataset.
# :param dataset_parts: "librispeech", "mini_librispeech",
# or a list of splits (e.g. "dev-clean") to download.
# :param force_download: Bool, if True, download the tars no matter if the tars exist.
# :param alignments: should we download the alignments. The original source is:
# https://github.com/CorentinJ/librispeech-alignments
# :param base_url: str, the url of the OpenSLR resources.
# :param alignments_url: str, the url of LibriSpeech word alignments
# :return: the path to downloaded and extracted directory with data.
# """
# target_dir = Path(target_dir)
# corpus_dir = target_dir / "LibriSpeech"
# target_dir.mkdir(parents=True, exist_ok=True)
# if dataset_parts == "librispeech":
# dataset_parts = LIBRISPEECH
# elif dataset_parts == "mini_librispeech":
# dataset_parts = MINI_LIBRISPEECH
# elif isinstance(dataset_parts, str):
# dataset_parts = [dataset_parts]
# for part in tqdm(dataset_parts, desc="Downloading LibriSpeech parts"):
# logging.info(f"Processing split: {part}")
# # Determine the valid URL for a given split.
# if part in LIBRISPEECH:
# url = f"{base_url}/12"
# elif part in MINI_LIBRISPEECH:
# url = f"{base_url}/31"
# else:
# logging.warning(f"Invalid dataset part name: {part}")
# continue
# # Split directory exists and seem valid? Skip this split.
# part_dir = corpus_dir / part
# completed_detector = part_dir / ".completed"
# if completed_detector.is_file():
# logging.info(f"Skipping {part} because {completed_detector} exists.")
# continue
# # Maybe-download the archive.
# tar_name = f"{part}.tar.gz"
# tar_path = target_dir / tar_name
# if force_download or not tar_path.is_file():
# urlretrieve_progress(
# f"{url}/{tar_name}", filename=tar_path, desc=f"Downloading {tar_name}"
# )
# # Remove partial unpacked files, if any, and unpack everything.
# shutil.rmtree(part_dir, ignore_errors=True)
# with tarfile.open(tar_path) as tar:
# safe_extract(tar, path=target_dir)
# completed_detector.touch()
# if alignments:
# completed_detector = target_dir / ".ali_completed"
# if completed_detector.is_file() and not force_download:
# return corpus_dir
# assert is_module_available(
# "gdown"
# ), 'To download LibriSpeech alignments, please install "pip install gdown"'
# import gdown
# ali_zip_path = str(target_dir / "LibriSpeech-Alignments.zip")
# gdown.download(alignments_url, output=ali_zip_path)
# with zipfile.ZipFile(ali_zip_path) as f:
# f.extractall(path=target_dir)
# completed_detector.touch()
# return corpus_dir
def prepare_userlibri(
corpus_dir: str,
dataset_parts: str = "auto",
output_dir: str = None,
num_jobs: int = 1,
) -> Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]:
"""
Returns the manifests which consist of the Recordings and Supervisions.
When all the manifests are available in the ``output_dir``, it will simply read and return them.
:param corpus_dir: Pathlike, the path of the data dir.
:param dataset_parts: string or sequence of strings representing dataset part names, e.g. 'train-clean-100', 'train-clean-5', 'dev-clean'.
By default we will infer which parts are available in ``corpus_dir``.
:param output_dir: Pathlike, the path where to write the manifests.
:return: a Dict whose key is the dataset part, and the value is Dicts with the keys 'audio' and 'supervisions'.
"""
# corpus_audio_dir = Path(corpus_dir + "/audio_data")
# corpus_lm_dir = Path(corpus_dir + "/lm_data")
# corpus_dir = Path(corpus_dir)
corpus_dir = Path(corpus_dir + "/audio_data")
assert corpus_dir.is_dir(), f"No such directory: {corpus_dir}"
spkwise_parent = corpus_dir / "speaker-wise-test"
spks = os.listdir(spkwise_parent)
bookwise_parent = corpus_dir / "book-wise-test"
books = os.listdir(bookwise_parent)
spks_parts = (
set(spks)
)
books_parts = (
set(books)
)
manifests = {}
for s_or_b, dataset_parts in zip(["speaker-wise-test", "book-wise-test"], [spks_parts, books_parts]):
if output_dir is not None:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Maybe the manifests already exist: we can read them and save a bit of preparation time.
manifests = read_manifests_if_cached(
dataset_parts=dataset_parts, output_dir=output_dir
)
with ThreadPoolExecutor(num_jobs) as ex:
for part in tqdm(dataset_parts, desc="Dataset parts"):
logging.info(f"Processing UserLibri subset: {part}")
if manifests_exist(part=part, output_dir=output_dir):
logging.info(f"UserLibri subset: {part} already prepared - skipping.")
continue
recordings = []
supervisions = []
part_path = corpus_dir / s_or_b / part
futures = []
for trans_path in tqdm(
part_path.rglob("*.trans.txt"), desc="Distributing tasks", leave=False
):
alignments = {}
with open(trans_path) as f:
for line in f:
futures.append(
ex.submit(parse_utterance, trans_path.parent, line, alignments)
)
for future in tqdm(futures, desc="Processing", leave=False):
result = future.result()
if result is None:
continue
recording, segment = result
recordings.append(recording)
supervisions.append(segment)
recording_set = RecordingSet.from_recordings(recordings)
supervision_set = SupervisionSet.from_segments(supervisions)
validate_recordings_and_supervisions(recording_set, supervision_set)
if output_dir is not None:
supervision_set.to_file(
output_dir / f"userlibri_supervisions_{part}.jsonl.gz"
)
recording_set.to_file(
output_dir / f"userlibri_recordings_{part}.jsonl.gz"
)
manifests[part] = {
"recordings": recording_set,
"supervisions": supervision_set,
}
return manifests
def parse_utterance(
dataset_split_path: Path,
line: str,
alignments: Dict[str, List[AlignmentItem]],
) -> Optional[Tuple[Recording, SupervisionSegment]]:
recording_id, text = line.strip().split(maxsplit=1)
# Create the Recording first
audio_path = (
dataset_split_path
/ f"{recording_id}.flac"
)
if not audio_path.is_file():
logging.warning(f"No such file: {audio_path}")
return None
recording = Recording.from_file(audio_path, recording_id=recording_id)
# Then, create the corresponding supervisions
segment = SupervisionSegment(
id=recording_id,
recording_id=recording_id,
start=0.0,
duration=recording.duration,
channel=0,
language="English",
speaker=re.sub(r"-.*", r"", recording.id),
text=text.strip(),
alignment={"word": alignments[recording_id]}
if recording_id in alignments
else None,
)
return recording, segment
def parse_alignments(ali_path: Pathlike) -> Dict[str, List[AlignmentItem]]:
alignments = {}
for line in Path(ali_path).read_text().splitlines():
utt_id, words, timestamps = line.split()
words = words.replace('"', "").split(",")
timestamps = [0.0] + list(map(float, timestamps.replace('"', "").split(",")))
alignments[utt_id] = [
AlignmentItem(
symbol=word, start=start, duration=round(end - start, ndigits=8)
)
for word, start, end in zip(words, timestamps, timestamps[1:])
]
return alignments
def main():
nj = 15
output_dir = "data/manifests"
corpus_dir = "/DB/UserLibri"
prepare_userlibri(corpus_dir, "auto", output_dir, nj)
main()