import os import time import json import urllib.parse import traceback import subprocess import sys import threading from flask import Flask, request, jsonify, send_file from playwright.sync_api import sync_playwright app = Flask(__name__) CHROMIUM_PATH = "/usr/bin/chromium-browser" if not os.path.exists(CHROMIUM_PATH): CHROMIUM_PATH = "/usr/bin/chromium" # --- قفل برای صف‌بندی درخواست‌های سنگین --- # این قفل باعث می‌شود لایک/فالو/کامنت یکی‌یکی انجام شوند TASK_LOCK = threading.Lock() # مرورگر اصلی (فقط یک بار اجرا می‌شود) playwright_instance = None browser = None # ======================================================= # 🛠️ توابع کمکی (Helper Functions) # ======================================================= def start_browser_engine(): """موتور کرومیوم را فقط یکبار روشن می‌کند""" global playwright_instance, browser if browser: return print("--- Starting Chromium Engine ---") playwright_instance = sync_playwright().start() browser = playwright_instance.chromium.launch( executable_path=CHROMIUM_PATH, headless=True, # در سرور واقعی True باشد args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled", "--disable-gpu" ] ) def parse_proxy(proxy_str): """پشتیبانی هوشمند از انواع فرمت‌های پروکسی""" if not proxy_str: return None proxy_str = proxy_str.strip() # 1. اگر فرمت URL بود (مثل چیزی که فرستادی: user:pass@ip:port) if "@" in proxy_str: try: # اضافه کردن http اگر نداشته باشد (برای اینکه urlparse کار کند) if not proxy_str.startswith("http"): temp_proxy = "http://" + proxy_str else: temp_proxy = proxy_str parsed = urllib.parse.urlparse(temp_proxy) return { "server": f"http://{parsed.hostname}:{parsed.port}", "username": parsed.username, "password": parsed.password } except Exception as e: print(f"⚠️ Proxy parse error: {e}") return None # 2. اگر فرمت ساده دو نقطه‌ای بود (IP:PORT:USER:PASS) # اول پروتکل‌های احتمالی را حذف می‌کنیم clean_str = proxy_str.replace("http://", "").replace("https://", "") parts = clean_str.split(':') config = {} if len(parts) == 2: # IP:PORT config = {"server": f"http://{parts[0]}:{parts[1]}"} elif len(parts) == 4: # IP:PORT:USER:PASS config = { "server": f"http://{parts[0]}:{parts[1]}", "username": parts[2], "password": parts[3] } else: print(f"⚠️ Invalid proxy format: {proxy_str}") return None return config def create_context_with_proxy(proxy_data): """ساخت یک نشست (Session) جدید با پروکسی اختصاصی""" if not browser: start_browser_engine() proxy_config = parse_proxy(proxy_data) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", viewport={"width": 1280, "height": 800}, proxy=proxy_config # <--- اعمال پروکسی اینجا انجام می‌شود ) # تزریق اسکریپت Stealth stealth_js = """ Object.defineProperty(navigator, 'webdriver', { get: () => false }); window.chrome = { runtime: {} }; Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); """ context.add_init_script(stealth_js) return context def inject_cookies(context, cookies_data): if not cookies_data: return if isinstance(cookies_data, str): cookies_data = json.loads(cookies_data) formatted_cookies = [] for k, v in cookies_data.items(): val = urllib.parse.unquote(str(v)) if k == "sessionid" and "%" in str(v) else str(v) formatted_cookies.append({ 'name': k, 'value': val, 'domain': '.instagram.com', 'path': '/', 'secure': True, 'sameSite': 'Lax' }) context.clear_cookies() context.add_cookies(formatted_cookies) # ======================================================= # 🚀 مسیرها (Routes) # ======================================================= @app.route('/') def health_check(): """این مسیر همیشه سریع جواب می‌دهد، حتی اگر ربات مشغول باشد""" return "OK - Server is Alive", 200 @app.route('/auto_like', methods=['POST']) def auto_like(): # استفاده از قفل: اگر کسی داخل باشد، بقیه منتظر می‌مانند (صف) with TASK_LOCK: context = None page = None try: # 1. دریافت داده‌ها data = request.json proxy_str = data.get('proxy') cookies = data.get('cookies') post_url = data.get('post_url') print(f"Request received for LIKE using proxy: {proxy_str}") # 2. ساخت کانتکست اختصاصی context = create_context_with_proxy(proxy_str) page = context.new_page() # 3. تزریق کوکی و رفتن به صفحه inject_cookies(context, cookies) try: page.goto(post_url, wait_until="networkidle", timeout=60000) except: pass # 4. لاگیک لایک status = "Failed" target_btn = None # تشخیص وضعیت for _ in range(20): if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0: status = "Already Liked" break likes = page.locator('svg[aria-label="Like"]') if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]') if likes.count() > 0 and likes.first.is_visible(): target_btn = likes.first break page.wait_for_timeout(1000) if status == "Already Liked": pass elif target_btn: target_btn.click(force=True) page.wait_for_timeout(2000) if page.locator('svg[aria-label="Unlike"]').count() > 0: status = "Success: Liked [Verified]" else: status = "Success: Clicked (No Verify)" else: page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2) status = "Success: DoubleTap" page.screenshot(path="current_view.png") return jsonify({"status": status, "proxy_used": proxy_str}) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 finally: if context: context.close() @app.route('/auto_comment', methods=['POST']) def auto_comment(): with TASK_LOCK: context = None page = None try: data = request.json proxy_str = data.get('proxy') cookies = data.get('cookies') post_url = data.get('post_url') comment_text = data.get('comment_text') print(f"Request received for COMMENT using proxy: {proxy_str}") context = create_context_with_proxy(proxy_str) page = context.new_page() inject_cookies(context, cookies) try: page.goto(post_url, wait_until="networkidle", timeout=60000) except: pass if page.locator("input[name='username']").count() > 0: return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401 # لاگیک کامنت bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']] bubbles.append(page.get_by_role("button").filter(has_text="Comment")) for b in bubbles: if b.count() > 0 and b.first.is_visible(): b.first.click(force=True) break input_ready = False for _ in range(30): placeholders = page.get_by_text("Add a comment...", exact=False) if placeholders.count() > 0 and placeholders.last.is_visible(): placeholders.last.click(force=True) input_ready = True break forms = page.locator("form") if forms.count() > 0 and forms.last.is_visible(): forms.last.click(force=True) input_ready = True break page.wait_for_timeout(1000) status = "Failed" if input_ready: page.wait_for_timeout(1000) candidates = [ page.locator('div[contenteditable="true"][role="textbox"]').last, page.locator('form textarea').last ] final_box = next((c for c in candidates if c.is_visible()), None) if final_box: final_box.click() final_box.type(comment_text, delay=100) page.wait_for_timeout(500) post_btn = page.locator('div[role="button"]').filter(has_text="Post").last if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last if post_btn.is_visible(): post_btn.click() status = "Success: Posted [Verified]" else: page.keyboard.press("Enter") status = "Success: Enter [Verified]" page.screenshot(path="current_view.png") return jsonify({"status": status, "proxy_used": proxy_str}) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 finally: if context: context.close() @app.route('/auto_follow', methods=['POST']) def auto_follow(): with TASK_LOCK: context = None page = None try: data = request.json proxy_str = data.get('proxy') cookies = data.get('cookies') target_user = data.get('target_username') print(f"Request received for FOLLOW using proxy: {proxy_str}") context = create_context_with_proxy(proxy_str) page = context.new_page() inject_cookies(context, cookies) try: page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000) except: pass status = "Failed" target_btn = None done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"] follow_keys = ["Follow", "Follow Back", "دنبال کردن"] for _ in range(20): buttons = page.locator("header button") if buttons.count() == 0: buttons = page.locator("button") found = False for i in range(buttons.count()): btn = buttons.nth(i) if not btn.is_visible(): continue txt = btn.text_content().strip() if any(k in txt for k in done_keys): status = f"Already: {txt}" found = True; break if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: pass if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt: target_btn = btn found = True; break if found: break page.wait_for_timeout(1000) if "Already" in status: pass elif target_btn: target_btn.click(force=True) for _ in range(15): page.wait_for_timeout(1000) full_text = page.locator("header").text_content() if any(k in full_text for k in done_keys): status = "Success: Followed [Verified]" break if "Success" not in status: status = "Success: Clicked (Timeout)" page.screenshot(path="current_view.png") return jsonify({"status": status, "target": target_user}) except Exception as e: return jsonify({"error": str(e)}), 500 finally: if context: context.close() @app.route('/screenshot') def serve_screenshot(): # اسکرین شات پشت قفل نمی‌ماند if os.path.exists("current_view.png"): return send_file("current_view.png", mimetype='image/png') return "No image" # ======================================================= # 🔄 تابع آپدیت خودکار # ======================================================= def auto_update_loop(): """هر 60 ثانیه چک می‌کند اگر آپدیت آمده بود، برنامه را ریستارت می‌کند""" repo_dir = os.path.dirname(os.path.abspath(__file__)) while True: try: time.sleep(60) # هر 1 دقیقه چک کن # 1. Fetch subprocess.check_call(["git", "fetch"], cwd=repo_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # 2. Compare local_hash = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=repo_dir).strip() remote_hash = subprocess.check_output(["git", "rev-parse", "@{u}"], cwd=repo_dir).strip() if local_hash != remote_hash: print("\n[UPDATER] New version found! Updating...") # 3. Pull subprocess.check_call(["git", "pull"], cwd=repo_dir) # 4. Install Dependencies (Optional) req_file = os.path.join(repo_dir, "requirements.txt") if os.path.exists(req_file): subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) print("[UPDATER] Restarting application...\n") # 5. Restart if playwright_instance: playwright_instance.stop() os.execv(sys.executable, [sys.executable] + sys.argv) except Exception as e: print(f"[UPDATER ERROR]: {e}") time.sleep(60) if __name__ == '__main__': # --- شروع ترد آپدیت کننده در پس‌زمینه --- update_thread = threading.Thread(target=auto_update_loop, daemon=True) update_thread.start() print(">>> Auto-updater started in background...") start_browser_engine() # استارت اولیه موتور # تغییر مهم: threaded=True فعال شد # درخواست‌های همزمان پذیرفته می‌شوند، اما توابع سنگین با TASK_LOCK صف‌بندی می‌شوند app.run(host='0.0.0.0', port=5000, threaded=True)