import asyncio import aiohttp import time import re import pandas as pd import json from tqdm import tqdm class TopicRecreation: def __init__(self): self.instruction = f""" You will be given a tweet text. Your task is to write a phrase category for this tweet which tweet is related to it. this should be a combination of action + category : for example : انتقاد از سیاست ایران توهین به مقامات کشور حمایت از نظام جمهوری اسلامی جنگ اسراییل و قطر مسایل مربوط به موضوع هسته ای ایران مسایل مربوط به افغانستان The category should be in persian. # Roles - If it does not have specifc meaning then write "متفرقه" - Be specifc about the countries. - Do not be specifc about the people. - you can consider different categories and write an action + category or just simple category Just return the category, do not include any other text. """ async def run_llm(self, session, tweet): """ Run the LLM as reranker. Args: session: The session to use for the request. tweet: The tweet to rerank. Returns: The category of the tweet. """ headers = {"Content-Type": "application/json",} tweet = " ".join([m for m in tweet.split(" ") if "@" not in m]) input_message = f"""{{"tweet": "{tweet}"}}""" messages = [{"role": "system", "content": self.instruction}, {"role": "user", "content": input_message}] payload = { "model": "google/gemma-3-27b-it", "messages": messages, "max_tokens": 500 } # try: async with session.post("http://192.168.130.206:4001/v1/chat/completions", headers=headers, json=payload) as resp: resp.raise_for_status() response = await resp.json() out = response['choices'][0]['message']['content'] return out # except Exception as e: # print(f"Error in llm as reranker: {e}") # return 0 async def run_llm_async(self, tweets): """ Send all chunk requests concurrently. Args: tweets: The tweets to rerank. Returns: The categories of the tweets. """ async with aiohttp.ClientSession() as session: tasks = [self.run_llm(session, tweet) for tweet in tweets] scores_embed = await asyncio.gather(*tasks) return scores_embed def sanitize_for_excel(self, df): def _sanitize_for_excel(text): """Remove zero-width and bidi control characters that can confuse Excel rendering.""" if text is None: return "" s = str(text) # Characters to remove: ZWNJ, ZWJ, RLM, LRM, RLE, LRE, PDF, BOM, Tatweel remove_chars = [ "\u200c", # ZWNJ "\u200d", # ZWJ "\u200e", # LRM "\u200f", # RLM "\u202a", # LRE "\u202b", # RLE "\u202c", # PDF "\u202d", # LRO "\u202e", # RLO "\ufeff", # BOM "\u0640", # Tatweel ] for ch in remove_chars: s = s.replace(ch, "") # Normalize whitespace s = re.sub(r"\s+", " ", s).strip() return s df_copy = df.copy() for m in ["category"]: for i in range(len(df_copy[m])): df_copy.loc[i, m] = _sanitize_for_excel(df_copy.loc[i, m]) return df_copy def start_process(self, input_path, output_path): df = pd.read_excel(input_path) df_copy = df.copy() tweets = df["tweet"].tolist() for i in tqdm(range(0, len(tweets), 1000)): start_time = time.time() result_list = asyncio.run(self.run_llm_async(tweets[i:i+1000])) end_time = time.time() print(f"Time taken for llm as reranker: {end_time - start_time} seconds") time.sleep(5) for j, result in enumerate(result_list): df_copy.at[i+j, "category"] = result df_copy = self.sanitize_for_excel(df_copy) df_copy.to_excel(output_path) if __name__ == "__main__": llm = TopicRecreation() llm.start_process("/home/firouzi/trend_grouping_new/tweet_topic.xlsx", "/home/firouzi/trend_grouping_new/tweet_topic_recreation.xlsx")