mirror of
https://github.com/k2-fsa/icefall.git
synced 2025-08-09 18:12:19 +00:00
240 lines
7.3 KiB
Python
Executable File
240 lines
7.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright 2023 Xiaomi Corp. (authors: Fangjun Kuang, Zengwei Yao)
|
|
#
|
|
# See ../../../../LICENSE for clarification regarding multiple authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
"""
|
|
This file merge overlapped chunks into utterances accroding to recording ids.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import sentencepiece as spm
|
|
from lhotse import (
|
|
CutSet,
|
|
MonoCut,
|
|
SupervisionSegment,
|
|
SupervisionSet,
|
|
load_manifest,
|
|
load_manifest_lazy,
|
|
)
|
|
from lhotse.cut import Cut
|
|
from lhotse.serialization import SequentialJsonlWriter
|
|
|
|
|
|
def get_parser():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"--bpe-model",
|
|
type=str,
|
|
default="data/lang_bpe_500/bpe.model",
|
|
help="Path to the BPE model",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--manifest-in-dir",
|
|
type=Path,
|
|
default=Path("data/librilight/manifests_chunk_recog"),
|
|
help="Path to directory of chunk cuts with recognition results.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--manifest-out-dir",
|
|
type=Path,
|
|
default=Path("data/manifests"),
|
|
help="Path to directory to save full utterance by merging overlapped chunks.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--extra",
|
|
type=float,
|
|
default=2.0,
|
|
help="""Extra duration (in seconds) at both sides.""",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def merge_chunks(
|
|
cuts_chunk: CutSet,
|
|
supervisions: SupervisionSet,
|
|
cuts_writer: SequentialJsonlWriter,
|
|
sp: spm.SentencePieceProcessor,
|
|
extra: float,
|
|
) -> int:
|
|
"""Merge chunk-wise cuts accroding to recording ids.
|
|
|
|
Args:
|
|
cuts_chunk:
|
|
The chunk-wise cuts opened in a lazy mode.
|
|
supervisions:
|
|
The supervision manifest containing text file path, opened in a lazy mode.
|
|
cuts_writer:
|
|
Writer to save the cuts with recognition results.
|
|
sp:
|
|
The BPE model.
|
|
extra:
|
|
Extra duration (in seconds) to drop at both sides of each chunk.
|
|
"""
|
|
|
|
# Background worker to add alignemnt and save cuts to disk.
|
|
def _save_worker(utt_cut: Cut, flush=False):
|
|
cuts_writer.write(utt_cut, flush=flush)
|
|
|
|
def _merge(cut_list: List[Cut], rec_id: str, utt_idx: int):
|
|
"""Merge chunks with same recording_id."""
|
|
for cut in cut_list:
|
|
assert cut.recording.id == rec_id, (cut.recording.id, rec_id)
|
|
|
|
# For each group with a same recording, sort it accroding to the start time
|
|
# In fact, we don't need to do this since the cuts have been sorted
|
|
# according to the start time
|
|
cut_list = sorted(cut_list, key=(lambda cut: cut.start))
|
|
|
|
rec = cut_list[0].recording
|
|
alignments = []
|
|
cur_end = 0
|
|
for cut in cut_list:
|
|
# Get left and right borders
|
|
left = cut.start + extra if cut.start > 0 else 0
|
|
chunk_end = cut.start + cut.duration
|
|
right = chunk_end - extra if chunk_end < rec.duration else rec.duration
|
|
|
|
# Assert the chunks are continuous
|
|
assert left == cur_end, (left, cur_end)
|
|
cur_end = right
|
|
|
|
assert len(cut.supervisions) == 1, len(cut.supervisions)
|
|
for ali in cut.supervisions[0].alignment["symbol"]:
|
|
t = ali.start + cut.start
|
|
if left <= t < right:
|
|
alignments.append(ali.with_offset(cut.start))
|
|
|
|
old_sup = supervisions[rec_id]
|
|
# Assuming the supervisions are sorted with the same recoding order as in cuts_chunk
|
|
# old_sup = supervisions[utt_idx]
|
|
assert old_sup.recording_id == rec_id, (old_sup.recording_id, rec_id)
|
|
|
|
new_sup = SupervisionSegment(
|
|
id=rec_id,
|
|
recording_id=rec_id,
|
|
start=0,
|
|
duration=rec.duration,
|
|
alignment={"symbol": alignments},
|
|
language=old_sup.language,
|
|
speaker=old_sup.speaker,
|
|
)
|
|
|
|
utt_cut = MonoCut(
|
|
id=rec_id,
|
|
start=0,
|
|
duration=rec.duration,
|
|
channel=0,
|
|
recording=rec,
|
|
supervisions=[new_sup],
|
|
)
|
|
# Set a custom attribute to the cut
|
|
utt_cut.text_path = old_sup.book
|
|
|
|
return utt_cut
|
|
|
|
last_rec_id = None
|
|
cut_list = []
|
|
utt_idx = 0
|
|
|
|
futures = []
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
for cut in cuts_chunk:
|
|
cur_rec_id = cut.recording.id
|
|
if len(cut_list) == 0:
|
|
# Case of the first cut
|
|
last_rec_id = cur_rec_id
|
|
cut_list.append(cut)
|
|
elif cur_rec_id == last_rec_id:
|
|
cut_list.append(cut)
|
|
else:
|
|
# Case of a cut belonging to a new recording
|
|
utt_cut = _merge(cut_list, last_rec_id, utt_idx)
|
|
utt_idx += 1
|
|
|
|
futures.append(executor.submit(_save_worker, utt_cut))
|
|
|
|
last_rec_id = cur_rec_id
|
|
cut_list = [cut]
|
|
|
|
if utt_idx % 5000 == 0:
|
|
logging.info(f"Procesed {utt_idx} utterances.")
|
|
|
|
# For the cuts belonging to the last recording
|
|
if len(cut_list) != 0:
|
|
utt_cut = _merge(cut_list, last_rec_id, utt_idx)
|
|
utt_idx += 1
|
|
|
|
futures.append(executor.submit(_save_worker, utt_cut))
|
|
logging.info("Finished")
|
|
|
|
for f in futures:
|
|
f.result()
|
|
|
|
return utt_idx
|
|
|
|
|
|
def main():
|
|
args = get_parser()
|
|
|
|
sp = spm.SentencePieceProcessor()
|
|
sp.load(args.bpe_model)
|
|
|
|
# It contains "librilight_recordings_*.jsonl.gz" and "librilight_supervisions_small.jsonl.gz"
|
|
manifest_out_dir = args.manifest_out_dir
|
|
|
|
subsets = ["small", "median", "large"]
|
|
|
|
for subset in subsets:
|
|
logging.info(f"Processing {subset} subset")
|
|
|
|
manifest_out = manifest_out_dir / f"librilight_cuts_{subset}.jsonl.gz"
|
|
if manifest_out.is_file():
|
|
logging.info(f"{manifest_out} already exists - skipping.")
|
|
continue
|
|
|
|
supervisions = load_manifest(
|
|
manifest_out_dir / f"librilight_supervisions_{subset}.jsonl.gz"
|
|
) # We will use the text path from supervisions
|
|
|
|
cuts_chunk = load_manifest_lazy(
|
|
args.manifest_in_dir / f"librilight_cuts_{subset}.jsonl.gz"
|
|
)
|
|
|
|
cuts_writer = CutSet.open_writer(manifest_out, overwrite=True)
|
|
num_utt = merge_chunks(
|
|
cuts_chunk, supervisions, cuts_writer=cuts_writer, sp=sp, extra=args.extra
|
|
)
|
|
cuts_writer.close()
|
|
logging.info(f"{num_utt} cuts saved to {manifest_out}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s"
|
|
logging.basicConfig(format=formatter, level=logging.INFO)
|
|
|
|
main()
|