2025-11-12 15:02:02 +00:00

225 lines
7.8 KiB
Python

import argparse
import json
import math
import importlib
from tqdm import tqdm
from hazm import Normalizer
import random
import numpy as np
import faiss
normalizer = Normalizer()
def load_dataset(input_file):
with open(input_file, "r", encoding="utf-8") as f:
dataset = json.load(f)[:1000]
return dataset
def calculate_ndcg(scores, n):
def calculate_dcg(scores, n):
idcg = 0
for i in range(n):
a = (2 ** scores[i]) - 1
b = math.log2(i + 2)
idcg += (a/b)
return idcg
def calculate_idcg(scores, n):
new_scores = scores.copy()
new_scores.sort(reverse=True)
idcg = calculate_dcg(new_scores, n)
return idcg
dcg = calculate_dcg(scores, n)
idcg = 1 #calculate_idcg(scores, n)
ndcg = dcg/idcg
return ndcg
def calculate_recall(scores):
try:
num_ground_truth = scores.count(1)
recall_7 = scores[:7].count(1) / num_ground_truth
recall_12 = scores[:12].count(1) / num_ground_truth
recall_20 = scores[:20].count(1) / num_ground_truth
recall_variant = scores[:scores.count(1)].count(1) / scores.count(1)
return recall_7, recall_12, recall_20, recall_variant
except:
return 0, 0, 0, 0
def calculate_precision(scores):
precision_7 = scores[:7].count(1) / 7
precision_12 = scores[:12].count(1) / 12
precision_20 = scores[:20].count(1) / 20
return precision_7, precision_12, precision_20
def preprocess_reranker(text:str, preprocess:bool=True, add_extra_word:bool=False):
if preprocess:
text = text.replace("\n", ".")
text = normalizer.normalize(text)
if add_extra_word:
text += " رهبر انقلاب اسلامی حضرت امام خامنه ای "
return text
def run(input_file, model):
module = importlib.import_module("evaluation.models." + model)
model = module.model()
ndcg_scores = []
recall_7_scores = []
recall_12_scores = []
recall_20_scores = []
recall_variant_scores = []
precision_7_scores = []
precision_12_scores = []
precision_20_scores = []
all_dataset = load_dataset(input_file)[:1000]
batch_size = 100
len_dataset = len(all_dataset)
all_dataset_embeddings = [{'question_embedding': "", 'passage_positive_embedding': []} for _ in range(len_dataset)]
all_embeddings = []
all_texts = []
print("calculate question embeddings")
# calculate question embeddings
for i in tqdm(range(0, len_dataset, batch_size)):
question_list = []
for id in range(i, min(i + batch_size, len_dataset)):
question_list.append(all_dataset[id]['question'])
question_embeddings = model.embed_texts(question_list, query_is=True)
count = 0
for id in range(i, min(i + batch_size, len_dataset)):
all_dataset_embeddings[id]['question_embedding'] = question_embeddings[count]
count += 1
print("calculate passage positive embeddings")
# calculate passage positive embeddings
for i in tqdm(range(0, len_dataset, batch_size)):
passage_positive_list = []
for id in range(i, min(i + batch_size, len_dataset)):
for passage in all_dataset[id]['passage_positive']:
passage_positive_list.append(passage)
passage_positive_embeddings = model.embed_texts(passage_positive_list)
count = 0
for id in range(i, min(i + batch_size, len_dataset)):
for passage_id in range(len(all_dataset[id]['passage_positive'])):
all_dataset_embeddings[id]['passage_positive_embedding'].append(passage_positive_embeddings[count])
all_embeddings.append(passage_positive_embeddings[count])
all_texts.append(all_dataset[id]['passage_positive'][passage_id])
count += 1
print("calculate passage negative embeddings")
# calculate passage negative embeddings
for i in tqdm(range(0, len_dataset, batch_size)):
passage_negative_list = []
for id in range(i, min(i + batch_size, len_dataset)):
for passage in all_dataset[id]['passage_negative']:
passage_negative_list.append(passage)
passage_negative_embeddings = model.embed_texts(passage_negative_list)
count = 0
for id in range(i, min(i + batch_size, len_dataset)):
for passage_id in range(len(all_dataset[id]['passage_negative'])):
all_embeddings.append(passage_negative_embeddings[count])
all_texts.append(all_dataset[id]['passage_negative'][passage_id])
count += 1
#create faiss index
all_embeddings = np.array(all_embeddings, dtype=np.float32)
print(f"all_embeddings shape: {all_embeddings.shape}")
dim = all_embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
faiss.normalize_L2(all_embeddings)
index.add(all_embeddings)
for count, data in enumerate(tqdm(all_dataset)):
#get top 10 chunks
question_embeddings = all_dataset_embeddings[count]['question_embedding']
question_embeddings_normalized = np.array([question_embeddings], dtype=np.float32)
faiss.normalize_L2(question_embeddings_normalized)
scores_embed, ids_embed = index.search(question_embeddings_normalized, 10)
chunks = [all_texts[id] for id in ids_embed[0]]
scores_llm = []
for chunk in chunks:
if chunk in data["passage_positive"]:
scores_llm.append(1)
else:
scores_llm.append(0)
# print(f"question {count}: {question}")
# for i in range(len(scores_embed)):
# print(f"chunk {i}: scores_embed {scores_embed[i]}, scores_llm {scores_llm[i]}")
# print("--------------------------------\n")
sorted_pairs = sorted(zip(scores_embed, scores_llm), reverse=True)
scores = [rel for _, rel in sorted_pairs]
#calculate ndcg
ndcg = calculate_ndcg(scores, len(scores))
ndcg_scores.append(ndcg)
#calculate recall
recall_7, recall_12, recall_20, recall_variant = calculate_recall(scores)
recall_7_scores.append(recall_7)
recall_12_scores.append(recall_12)
recall_20_scores.append(recall_20)
recall_variant_scores.append(recall_variant)
#calculate precision
precision_7, precision_12, precision_20 = calculate_precision(scores)
precision_7_scores.append(precision_7)
precision_12_scores.append(precision_12)
precision_20_scores.append(precision_20)
print(f"NDCG: {sum(ndcg_scores)/len(ndcg_scores)}")
print(f"Recall 7: {sum(recall_7_scores)/len(recall_7_scores)}")
print(f"Recall 12: {sum(recall_12_scores)/len(recall_12_scores)}")
print(f"Recall 20: {sum(recall_20_scores)/len(recall_20_scores)}")
print(f"Recall Variant: {sum(recall_variant_scores)/len(recall_variant_scores)}")
print(f"Precision 7: {sum(precision_7_scores)/len(precision_7_scores)}")
print(f"Precision 12: {sum(precision_12_scores)/len(precision_12_scores)}")
print(f"Precision 20: {sum(precision_20_scores)/len(precision_20_scores)}")
def main():
"""
-First give your questions to generate_dataset.py and generate a json file and give the path as input_file.
-Second create your model class in ./models folder similar to sample_model.py
-Third run the script with the following command:
python evaluate.py --input_file <path_to_your_json_file> --model <path_to_your_model_class>
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input_file', help='json input file path')
parser.add_argument('--model', help='the path of model class')
args = parser.parse_args()
print(f"Start to evaluate the model {args.model} with normalizer and extra words input file {args.input_file}")
run(args.input_file, args.model)
if __name__ == "__main__":
exit(main())