import argparse import json import math import importlib from tqdm import tqdm from hazm import Normalizer import random import numpy as np import faiss normalizer = Normalizer() def load_dataset(input_file): with open(input_file, "r", encoding="utf-8") as f: dataset = json.load(f)[:1000] return dataset def calculate_ndcg(scores, n): def calculate_dcg(scores, n): idcg = 0 for i in range(n): a = (2 ** scores[i]) - 1 b = math.log2(i + 2) idcg += (a/b) return idcg def calculate_idcg(scores, n): new_scores = scores.copy() new_scores.sort(reverse=True) idcg = calculate_dcg(new_scores, n) return idcg dcg = calculate_dcg(scores, n) idcg = 1 #calculate_idcg(scores, n) ndcg = dcg/idcg return ndcg def calculate_recall(scores): try: num_ground_truth = scores.count(1) recall_7 = scores[:7].count(1) / num_ground_truth recall_12 = scores[:12].count(1) / num_ground_truth recall_20 = scores[:20].count(1) / num_ground_truth recall_variant = scores[:scores.count(1)].count(1) / scores.count(1) return recall_7, recall_12, recall_20, recall_variant except: return 0, 0, 0, 0 def calculate_precision(scores): precision_7 = scores[:7].count(1) / 7 precision_12 = scores[:12].count(1) / 12 precision_20 = scores[:20].count(1) / 20 return precision_7, precision_12, precision_20 def preprocess_reranker(text:str, preprocess:bool=True, add_extra_word:bool=False): if preprocess: text = text.replace("\n", ".") text = normalizer.normalize(text) if add_extra_word: text += " رهبر انقلاب اسلامی حضرت امام خامنه ای " return text def run(input_file, model): module = importlib.import_module("evaluation.models." + model) model = module.model() ndcg_scores = [] recall_7_scores = [] recall_12_scores = [] recall_20_scores = [] recall_variant_scores = [] precision_7_scores = [] precision_12_scores = [] precision_20_scores = [] all_dataset = load_dataset(input_file)[:1000] batch_size = 100 len_dataset = len(all_dataset) all_dataset_embeddings = [{'question_embedding': "", 'passage_positive_embedding': []} for _ in range(len_dataset)] all_embeddings = [] all_texts = [] print("calculate question embeddings") # calculate question embeddings for i in tqdm(range(0, len_dataset, batch_size)): question_list = [] for id in range(i, min(i + batch_size, len_dataset)): question_list.append(all_dataset[id]['question']) question_embeddings = model.embed_texts(question_list, query_is=True) count = 0 for id in range(i, min(i + batch_size, len_dataset)): all_dataset_embeddings[id]['question_embedding'] = question_embeddings[count] count += 1 print("calculate passage positive embeddings") # calculate passage positive embeddings for i in tqdm(range(0, len_dataset, batch_size)): passage_positive_list = [] for id in range(i, min(i + batch_size, len_dataset)): for passage in all_dataset[id]['passage_positive']: passage_positive_list.append(passage) passage_positive_embeddings = model.embed_texts(passage_positive_list) count = 0 for id in range(i, min(i + batch_size, len_dataset)): for passage_id in range(len(all_dataset[id]['passage_positive'])): all_dataset_embeddings[id]['passage_positive_embedding'].append(passage_positive_embeddings[count]) all_embeddings.append(passage_positive_embeddings[count]) all_texts.append(all_dataset[id]['passage_positive'][passage_id]) count += 1 print("calculate passage negative embeddings") # calculate passage negative embeddings for i in tqdm(range(0, len_dataset, batch_size)): passage_negative_list = [] for id in range(i, min(i + batch_size, len_dataset)): for passage in all_dataset[id]['passage_negative']: passage_negative_list.append(passage) passage_negative_embeddings = model.embed_texts(passage_negative_list) count = 0 for id in range(i, min(i + batch_size, len_dataset)): for passage_id in range(len(all_dataset[id]['passage_negative'])): all_embeddings.append(passage_negative_embeddings[count]) all_texts.append(all_dataset[id]['passage_negative'][passage_id]) count += 1 #create faiss index all_embeddings = np.array(all_embeddings, dtype=np.float32) print(f"all_embeddings shape: {all_embeddings.shape}") dim = all_embeddings.shape[1] index = faiss.IndexFlatIP(dim) faiss.normalize_L2(all_embeddings) index.add(all_embeddings) for count, data in enumerate(tqdm(all_dataset)): #get top 10 chunks question_embeddings = all_dataset_embeddings[count]['question_embedding'] question_embeddings_normalized = np.array([question_embeddings], dtype=np.float32) faiss.normalize_L2(question_embeddings_normalized) scores_embed, ids_embed = index.search(question_embeddings_normalized, 10) chunks = [all_texts[id] for id in ids_embed[0]] scores_llm = [] for chunk in chunks: if chunk in data["passage_positive"]: scores_llm.append(1) else: scores_llm.append(0) # print(f"question {count}: {question}") # for i in range(len(scores_embed)): # print(f"chunk {i}: scores_embed {scores_embed[i]}, scores_llm {scores_llm[i]}") # print("--------------------------------\n") sorted_pairs = sorted(zip(scores_embed, scores_llm), reverse=True) scores = [rel for _, rel in sorted_pairs] #calculate ndcg ndcg = calculate_ndcg(scores, len(scores)) ndcg_scores.append(ndcg) #calculate recall recall_7, recall_12, recall_20, recall_variant = calculate_recall(scores) recall_7_scores.append(recall_7) recall_12_scores.append(recall_12) recall_20_scores.append(recall_20) recall_variant_scores.append(recall_variant) #calculate precision precision_7, precision_12, precision_20 = calculate_precision(scores) precision_7_scores.append(precision_7) precision_12_scores.append(precision_12) precision_20_scores.append(precision_20) print(f"NDCG: {sum(ndcg_scores)/len(ndcg_scores)}") print(f"Recall 7: {sum(recall_7_scores)/len(recall_7_scores)}") print(f"Recall 12: {sum(recall_12_scores)/len(recall_12_scores)}") print(f"Recall 20: {sum(recall_20_scores)/len(recall_20_scores)}") print(f"Recall Variant: {sum(recall_variant_scores)/len(recall_variant_scores)}") print(f"Precision 7: {sum(precision_7_scores)/len(precision_7_scores)}") print(f"Precision 12: {sum(precision_12_scores)/len(precision_12_scores)}") print(f"Precision 20: {sum(precision_20_scores)/len(precision_20_scores)}") def main(): """ -First give your questions to generate_dataset.py and generate a json file and give the path as input_file. -Second create your model class in ./models folder similar to sample_model.py -Third run the script with the following command: python evaluate.py --input_file --model """ parser = argparse.ArgumentParser() parser.add_argument('--input_file', help='json input file path') parser.add_argument('--model', help='the path of model class') args = parser.parse_args() print(f"Start to evaluate the model {args.model} with normalizer and extra words input file {args.input_file}") run(args.input_file, args.model) if __name__ == "__main__": exit(main())