From 8adc69aaa204a2e8327f544bb4d6d1744fb7b61e Mon Sep 17 00:00:00 2001 From: amirhossein Date: Wed, 7 Jan 2026 08:33:10 -0800 Subject: [PATCH] Add Threading and Lock(debug) --- main.py | 403 +++++++++++++++++++++++--------------------------------- 1 file changed, 166 insertions(+), 237 deletions(-) diff --git a/main.py b/main.py index 30a6d05..0b98dfb 100644 --- a/main.py +++ b/main.py @@ -11,99 +11,60 @@ from playwright.sync_api import sync_playwright app = Flask(__name__) +# مسیر کرومیوم CHROMIUM_PATH = "/usr/bin/chromium-browser" if not os.path.exists(CHROMIUM_PATH): CHROMIUM_PATH = "/usr/bin/chromium" # --- قفل برای صف‌بندی درخواست‌های سنگین --- -# این قفل باعث می‌شود لایک/فالو/کامنت یکی‌یکی انجام شوند +# این قفل باعث می‌شود لایک/فالو/کامنت یکی‌یکی انجام شوند اما سرور برای پینگ آزاد باشد TASK_LOCK = threading.Lock() -# مرورگر اصلی (فقط یک بار اجرا می‌شود) -playwright_instance = None -browser = None - # ======================================================= # 🛠️ توابع کمکی (Helper Functions) # ======================================================= -def start_browser_engine(): - """موتور کرومیوم را فقط یکبار روشن می‌کند""" - global playwright_instance, browser - if browser: return - - print("--- Starting Chromium Engine ---") - playwright_instance = sync_playwright().start() - browser = playwright_instance.chromium.launch( - executable_path=CHROMIUM_PATH, - headless=True, # در سرور واقعی True باشد - args=[ - "--no-sandbox", - "--disable-dev-shm-usage", - "--disable-blink-features=AutomationControlled", - "--disable-gpu" - ] - ) +def get_browser_args(): + """تنظیمات بهینه برای اجرا در سرور""" + return [ + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-blink-features=AutomationControlled", + "--disable-gpu" + ] def parse_proxy(proxy_str): """پشتیبانی هوشمند از انواع فرمت‌های پروکسی""" if not proxy_str: return None - proxy_str = proxy_str.strip() - # 1. اگر فرمت URL بود (مثل چیزی که فرستادی: user:pass@ip:port) + # فرمت URL if "@" in proxy_str: try: - # اضافه کردن http اگر نداشته باشد (برای اینکه urlparse کار کند) - if not proxy_str.startswith("http"): - temp_proxy = "http://" + proxy_str - else: - temp_proxy = proxy_str - + if not proxy_str.startswith("http"): temp_proxy = "http://" + proxy_str + else: temp_proxy = proxy_str parsed = urllib.parse.urlparse(temp_proxy) - - return { - "server": f"http://{parsed.hostname}:{parsed.port}", - "username": parsed.username, - "password": parsed.password - } - except Exception as e: - print(f"⚠️ Proxy parse error: {e}") - return None - - # 2. اگر فرمت ساده دو نقطه‌ای بود (IP:PORT:USER:PASS) - # اول پروتکل‌های احتمالی را حذف می‌کنیم + return {"server": f"http://{parsed.hostname}:{parsed.port}", "username": parsed.username, "password": parsed.password} + except: return None + + # فرمت IP:PORT:USER:PASS clean_str = proxy_str.replace("http://", "").replace("https://", "") parts = clean_str.split(':') - - config = {} - if len(parts) == 2: # IP:PORT - config = {"server": f"http://{parts[0]}:{parts[1]}"} - elif len(parts) == 4: # IP:PORT:USER:PASS - config = { - "server": f"http://{parts[0]}:{parts[1]}", - "username": parts[2], - "password": parts[3] - } - else: - print(f"⚠️ Invalid proxy format: {proxy_str}") - return None - - return config + if len(parts) == 2: return {"server": f"http://{parts[0]}:{parts[1]}"} + elif len(parts) == 4: return {"server": f"http://{parts[0]}:{parts[1]}", "username": parts[2], "password": parts[3]} + return None -def create_context_with_proxy(proxy_data): - """ساخت یک نشست (Session) جدید با پروکسی اختصاصی""" - if not browser: start_browser_engine() - +def create_context(browser_instance, proxy_data): + """کانتکست را روی مرورگر لوکال (داخل تابع) می‌سازد""" proxy_config = parse_proxy(proxy_data) - context = browser.new_context( + context = browser_instance.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", viewport={"width": 1280, "height": 800}, - proxy=proxy_config # <--- اعمال پروکسی اینجا انجام می‌شود + proxy=proxy_config ) - # تزریق اسکریپت Stealth + # اسکریپت مخفی کردن اتوماسیون stealth_js = """ Object.defineProperty(navigator, 'webdriver', { get: () => false }); window.chrome = { runtime: {} }; @@ -111,22 +72,15 @@ def create_context_with_proxy(proxy_data): Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); """ context.add_init_script(stealth_js) - return context def inject_cookies(context, cookies_data): if not cookies_data: return - if isinstance(cookies_data, str): - cookies_data = json.loads(cookies_data) - + if isinstance(cookies_data, str): cookies_data = json.loads(cookies_data) formatted_cookies = [] for k, v in cookies_data.items(): val = urllib.parse.unquote(str(v)) if k == "sessionid" and "%" in str(v) else str(v) - formatted_cookies.append({ - 'name': k, 'value': val, - 'domain': '.instagram.com', 'path': '/', - 'secure': True, 'sameSite': 'Lax' - }) + formatted_cookies.append({'name': k, 'value': val, 'domain': '.instagram.com', 'path': '/', 'secure': True, 'sameSite': 'Lax'}) context.clear_cookies() context.add_cookies(formatted_cookies) @@ -141,12 +95,9 @@ def health_check(): @app.route('/auto_like', methods=['POST']) def auto_like(): - # استفاده از قفل: اگر کسی داخل باشد، بقیه منتظر می‌مانند (صف) + # اگر کسی مشغول باشد، اینجا منتظر می‌ماند (Queue) with TASK_LOCK: - context = None - page = None try: - # 1. دریافت داده‌ها data = request.json proxy_str = data.get('proxy') cookies = data.get('cookies') @@ -154,63 +105,53 @@ def auto_like(): print(f"Request received for LIKE using proxy: {proxy_str}") - # 2. ساخت کانتکست اختصاصی - context = create_context_with_proxy(proxy_str) - page = context.new_page() - - # 3. تزریق کوکی و رفتن به صفحه - inject_cookies(context, cookies) - - try: - page.goto(post_url, wait_until="networkidle", timeout=60000) - except: pass - - # 4. لاگیک لایک - status = "Failed" - target_btn = None - - # تشخیص وضعیت - for _ in range(20): - if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0: - status = "Already Liked" - break + # اجرای مرورگر به صورت LOCAL داخل همین ترد (رفع ارور Greenlet) + with sync_playwright() as p: + browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args()) - likes = page.locator('svg[aria-label="Like"]') - if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]') - - if likes.count() > 0 and likes.first.is_visible(): - target_btn = likes.first - break - page.wait_for_timeout(1000) + context = create_context(browser, proxy_str) + inject_cookies(context, cookies) + page = context.new_page() - if status == "Already Liked": - pass - elif target_btn: - target_btn.click(force=True) - page.wait_for_timeout(2000) - if page.locator('svg[aria-label="Unlike"]').count() > 0: - status = "Success: Liked [Verified]" + try: page.goto(post_url, wait_until="networkidle", timeout=60000) + except: pass + + status = "Failed" + target_btn = None + + # لاگیک لایک + for _ in range(20): + if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0: + status = "Already Liked"; break + likes = page.locator('svg[aria-label="Like"]') + if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]') + if likes.count() > 0 and likes.first.is_visible(): + target_btn = likes.first; break + page.wait_for_timeout(1000) + + if status == "Already Liked": pass + elif target_btn: + target_btn.click(force=True) + page.wait_for_timeout(2000) + if page.locator('svg[aria-label="Unlike"]').count() > 0: + status = "Success: Liked [Verified]" + else: + status = "Success: Clicked (No Verify)" else: - status = "Success: Clicked (No Verify)" - else: - page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2) - status = "Success: DoubleTap" + page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2) + status = "Success: DoubleTap" - page.screenshot(path="current_view.png") - return jsonify({"status": status, "proxy_used": proxy_str}) + page.screenshot(path="current_view.png") + browser.close() # بستن مرورگر در پایان کار + return jsonify({"status": status, "proxy_used": proxy_str}) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 - finally: - if context: context.close() - @app.route('/auto_comment', methods=['POST']) def auto_comment(): with TASK_LOCK: - context = None - page = None try: data = request.json proxy_str = data.get('proxy') @@ -220,79 +161,73 @@ def auto_comment(): print(f"Request received for COMMENT using proxy: {proxy_str}") - context = create_context_with_proxy(proxy_str) - page = context.new_page() - inject_cookies(context, cookies) - - try: - page.goto(post_url, wait_until="networkidle", timeout=60000) - except: pass - - if page.locator("input[name='username']").count() > 0: - return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401 - - # لاگیک کامنت - bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']] - bubbles.append(page.get_by_role("button").filter(has_text="Comment")) - - for b in bubbles: - if b.count() > 0 and b.first.is_visible(): - b.first.click(force=True) - break - - input_ready = False - for _ in range(30): - placeholders = page.get_by_text("Add a comment...", exact=False) - if placeholders.count() > 0 and placeholders.last.is_visible(): - placeholders.last.click(force=True) - input_ready = True - break - forms = page.locator("form") - if forms.count() > 0 and forms.last.is_visible(): - forms.last.click(force=True) - input_ready = True - break - page.wait_for_timeout(1000) - - status = "Failed" - if input_ready: - page.wait_for_timeout(1000) - candidates = [ - page.locator('div[contenteditable="true"][role="textbox"]').last, - page.locator('form textarea').last - ] - final_box = next((c for c in candidates if c.is_visible()), None) + with sync_playwright() as p: + browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args()) - if final_box: - final_box.click() - final_box.type(comment_text, delay=100) - page.wait_for_timeout(500) - - post_btn = page.locator('div[role="button"]').filter(has_text="Post").last - if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last - - if post_btn.is_visible(): - post_btn.click() - status = "Success: Posted [Verified]" - else: - page.keyboard.press("Enter") - status = "Success: Enter [Verified]" + context = create_context(browser, proxy_str) + inject_cookies(context, cookies) + page = context.new_page() + + try: page.goto(post_url, wait_until="networkidle", timeout=60000) + except: pass - page.screenshot(path="current_view.png") - return jsonify({"status": status, "proxy_used": proxy_str}) + if page.locator("input[name='username']").count() > 0: + browser.close() + return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401 + + # لاگیک کامنت + bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']] + bubbles.append(page.get_by_role("button").filter(has_text="Comment")) + + for b in bubbles: + if b.count() > 0 and b.first.is_visible(): + b.first.click(force=True); break + + input_ready = False + for _ in range(30): + placeholders = page.get_by_text("Add a comment...", exact=False) + if placeholders.count() > 0 and placeholders.last.is_visible(): + placeholders.last.click(force=True); input_ready = True; break + forms = page.locator("form") + if forms.count() > 0 and forms.last.is_visible(): + forms.last.click(force=True); input_ready = True; break + page.wait_for_timeout(1000) + + status = "Failed" + if input_ready: + page.wait_for_timeout(1000) + candidates = [ + page.locator('div[contenteditable="true"][role="textbox"]').last, + page.locator('form textarea').last + ] + final_box = next((c for c in candidates if c.is_visible()), None) + + if final_box: + final_box.click() + final_box.type(comment_text, delay=100) + page.wait_for_timeout(500) + + post_btn = page.locator('div[role="button"]').filter(has_text="Post").last + if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last + + if post_btn.is_visible(): + post_btn.click() + status = "Success: Posted [Verified]" + else: + page.keyboard.press("Enter") + status = "Success: Enter [Verified]" + + page.screenshot(path="current_view.png") + browser.close() + return jsonify({"status": status, "proxy_used": proxy_str}) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 - finally: - if context: context.close() - @app.route('/auto_follow', methods=['POST']) def auto_follow(): with TASK_LOCK: - context = None - page = None try: data = request.json proxy_str = data.get('proxy') @@ -301,64 +236,60 @@ def auto_follow(): print(f"Request received for FOLLOW using proxy: {proxy_str}") - context = create_context_with_proxy(proxy_str) - page = context.new_page() - inject_cookies(context, cookies) - - try: - page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000) - except: pass + with sync_playwright() as p: + browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args()) + + context = create_context(browser, proxy_str) + inject_cookies(context, cookies) + page = context.new_page() - status = "Failed" - target_btn = None - done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"] - follow_keys = ["Follow", "Follow Back", "دنبال کردن"] - - for _ in range(20): - buttons = page.locator("header button") - if buttons.count() == 0: buttons = page.locator("button") - found = False - for i in range(buttons.count()): - btn = buttons.nth(i) - if not btn.is_visible(): continue - txt = btn.text_content().strip() - - if any(k in txt for k in done_keys): - status = f"Already: {txt}" - found = True; break - - if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: - pass + try: page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000) + except: pass - if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt: - target_btn = btn - found = True; break - if found: break - page.wait_for_timeout(1000) + status = "Failed" + target_btn = None + done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"] + follow_keys = ["Follow", "Follow Back", "دنبال کردن"] + + # لاگیک فالو + for _ in range(20): + buttons = page.locator("header button") + if buttons.count() == 0: buttons = page.locator("button") + found = False + for i in range(buttons.count()): + btn = buttons.nth(i) + if not btn.is_visible(): continue + txt = btn.text_content().strip() + + if any(k in txt for k in done_keys): + status = f"Already: {txt}"; found = True; break + + if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: pass - if "Already" in status: - pass - elif target_btn: - target_btn.click(force=True) - for _ in range(15): + if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt: + target_btn = btn; found = True; break + if found: break page.wait_for_timeout(1000) - full_text = page.locator("header").text_content() - if any(k in full_text for k in done_keys): - status = "Success: Followed [Verified]" - break - if "Success" not in status: status = "Success: Clicked (Timeout)" - - page.screenshot(path="current_view.png") - return jsonify({"status": status, "target": target_user}) + + if "Already" in status: pass + elif target_btn: + target_btn.click(force=True) + for _ in range(15): + page.wait_for_timeout(1000) + full_text = page.locator("header").text_content() + if any(k in full_text for k in done_keys): + status = "Success: Followed [Verified]"; break + if "Success" not in status: status = "Success: Clicked (Timeout)" + + page.screenshot(path="current_view.png") + browser.close() + return jsonify({"status": status, "target": target_user}) except Exception as e: return jsonify({"error": str(e)}), 500 - finally: - if context: context.close() @app.route('/screenshot') def serve_screenshot(): - # اسکرین شات پشت قفل نمی‌ماند if os.path.exists("current_view.png"): return send_file("current_view.png", mimetype='image/png') return "No image" @@ -394,9 +325,7 @@ def auto_update_loop(): print("[UPDATER] Restarting application...\n") # 5. Restart - if playwright_instance: - playwright_instance.stop() - + # اینجا دیگر نیازی به بستن دستی مرورگر نیست چون در هر درخواست بسته می‌شود os.execv(sys.executable, [sys.executable] + sys.argv) except Exception as e: @@ -409,7 +338,7 @@ if __name__ == '__main__': update_thread.start() print(">>> Auto-updater started in background...") - start_browser_engine() # استارت اولیه موتور + # در اینجا دیگر استارت اولیه موتور را نداریم چون داخل توابع انجام می‌شود # تغییر مهم: threaded=True فعال شد # درخواست‌های همزمان پذیرفته می‌شوند، اما توابع سنگین با TASK_LOCK صف‌بندی می‌شوند