commit a8749e3239bc3c117b752f9f7874f5209379a602 Author: root Date: Wed Jan 7 14:14:47 2026 +0000 Initial code upload diff --git a/main.py b/main.py new file mode 100644 index 0000000..985e5e1 --- /dev/null +++ b/main.py @@ -0,0 +1,402 @@ +import os +import time +import json +import urllib.parse +import traceback +import subprocess # <--- اضافه شد +import sys # <--- اضافه شد +import threading # <--- اضافه شد +from flask import Flask, request, jsonify, send_file +from playwright.sync_api import sync_playwright + +app = Flask(__name__) + +CHROMIUM_PATH = "/usr/bin/chromium-browser" +if not os.path.exists(CHROMIUM_PATH): + CHROMIUM_PATH = "/usr/bin/chromium" + +# مرورگر اصلی (فقط یک بار اجرا می‌شود) +playwright_instance = None +browser = None + +# ======================================================= +# 🛠️ توابع کمکی (Helper Functions) +# ======================================================= + +def start_browser_engine(): + """موتور کرومیوم را فقط یکبار روشن می‌کند""" + global playwright_instance, browser + if browser: return + + print("--- Starting Chromium Engine ---") + playwright_instance = sync_playwright().start() + browser = playwright_instance.chromium.launch( + executable_path=CHROMIUM_PATH, + headless=True, # در سرور واقعی True باشد + args=[ + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-blink-features=AutomationControlled", + "--disable-gpu" + ] + ) + +def parse_proxy(proxy_str): + """پشتیبانی هوشمند از انواع فرمت‌های پروکسی""" + if not proxy_str: return None + + proxy_str = proxy_str.strip() + + # 1. اگر فرمت URL بود (مثل چیزی که فرستادی: user:pass@ip:port) + if "@" in proxy_str: + try: + # اضافه کردن http اگر نداشته باشد (برای اینکه urlparse کار کند) + if not proxy_str.startswith("http"): + temp_proxy = "http://" + proxy_str + else: + temp_proxy = proxy_str + + parsed = urllib.parse.urlparse(temp_proxy) + + return { + "server": f"http://{parsed.hostname}:{parsed.port}", + "username": parsed.username, + "password": parsed.password + } + except Exception as e: + print(f"⚠️ Proxy parse error: {e}") + return None + + # 2. اگر فرمت ساده دو نقطه‌ای بود (IP:PORT:USER:PASS) + # اول پروتکل‌های احتمالی را حذف می‌کنیم + clean_str = proxy_str.replace("http://", "").replace("https://", "") + parts = clean_str.split(':') + + config = {} + if len(parts) == 2: # IP:PORT + config = {"server": f"http://{parts[0]}:{parts[1]}"} + elif len(parts) == 4: # IP:PORT:USER:PASS + config = { + "server": f"http://{parts[0]}:{parts[1]}", + "username": parts[2], + "password": parts[3] + } + else: + print(f"⚠️ Invalid proxy format: {proxy_str}") + return None + + return config + +def create_context_with_proxy(proxy_data): + """ساخت یک نشست (Session) جدید با پروکسی اختصاصی""" + if not browser: start_browser_engine() + + proxy_config = parse_proxy(proxy_data) + + context = browser.new_context( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + viewport={"width": 1280, "height": 800}, + proxy=proxy_config # <--- اعمال پروکسی اینجا انجام می‌شود + ) + + # تزریق اسکریپت Stealth + stealth_js = """ + Object.defineProperty(navigator, 'webdriver', { get: () => false }); + window.chrome = { runtime: {} }; + Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); + Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); + """ + context.add_init_script(stealth_js) + + return context + +def inject_cookies(context, cookies_data): + if not cookies_data: return + if isinstance(cookies_data, str): + cookies_data = json.loads(cookies_data) + + formatted_cookies = [] + for k, v in cookies_data.items(): + val = urllib.parse.unquote(str(v)) if k == "sessionid" and "%" in str(v) else str(v) + formatted_cookies.append({ + 'name': k, 'value': val, + 'domain': '.instagram.com', 'path': '/', + 'secure': True, 'sameSite': 'Lax' + }) + context.clear_cookies() + context.add_cookies(formatted_cookies) + +# ======================================================= +# 🚀 مسیرها (Routes) +# ======================================================= + +@app.route('/auto_like', methods=['POST']) +def auto_like(): + context = None + page = None + try: + # 1. دریافت داده‌ها + data = request.json + proxy_str = data.get('proxy') # <--- گرفتن پروکسی از ورودی + cookies = data.get('cookies') + post_url = data.get('post_url') + + print(f"Request received for LIKE using proxy: {proxy_str}") + + # 2. ساخت کانتکست اختصاصی + context = create_context_with_proxy(proxy_str) + page = context.new_page() + + # 3. تزریق کوکی و رفتن به صفحه + inject_cookies(context, cookies) + + try: + page.goto(post_url, wait_until="networkidle", timeout=60000) + except: pass + + # 4. لاگیک لایک (همان کد هوشمند قبلی) + status = "Failed" + target_btn = None + + # تشخیص وضعیت + for _ in range(20): + if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0: + status = "Already Liked" + break + + likes = page.locator('svg[aria-label="Like"]') + if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]') + + if likes.count() > 0 and likes.first.is_visible(): + target_btn = likes.first + break + page.wait_for_timeout(1000) + + if status == "Already Liked": + pass + elif target_btn: + target_btn.click(force=True) + page.wait_for_timeout(2000) + if page.locator('svg[aria-label="Unlike"]').count() > 0: + status = "Success: Liked [Verified]" + else: + status = "Success: Clicked (No Verify)" + else: + page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2) + status = "Success: DoubleTap" + + page.screenshot(path="current_view.png") + return jsonify({"status": status, "proxy_used": proxy_str}) + + except Exception as e: + traceback.print_exc() + return jsonify({"error": str(e)}), 500 + finally: + # بسیار مهم: بستن کانتکست برای آزاد کردن رم و آی‌پی + if context: context.close() + + +@app.route('/auto_comment', methods=['POST']) +def auto_comment(): + context = None + page = None + try: + data = request.json + proxy_str = data.get('proxy') + cookies = data.get('cookies') + post_url = data.get('post_url') + comment_text = data.get('comment_text') + + print(f"Request received for COMMENT using proxy: {proxy_str}") + + context = create_context_with_proxy(proxy_str) + page = context.new_page() + inject_cookies(context, cookies) + + try: + page.goto(post_url, wait_until="networkidle", timeout=60000) + except: pass + + if page.locator("input[name='username']").count() > 0: + return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401 + + # لاگیک کامنت (همان کد نهایی و وریفای شده) + bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']] + bubbles.append(page.get_by_role("button").filter(has_text="Comment")) + + for b in bubbles: + if b.count() > 0 and b.first.is_visible(): + b.first.click(force=True) + break + + input_ready = False + for _ in range(30): + placeholders = page.get_by_text("Add a comment...", exact=False) + if placeholders.count() > 0 and placeholders.last.is_visible(): + placeholders.last.click(force=True) + input_ready = True + break + forms = page.locator("form") + if forms.count() > 0 and forms.last.is_visible(): + forms.last.click(force=True) + input_ready = True + break + page.wait_for_timeout(1000) + + status = "Failed" + if input_ready: + page.wait_for_timeout(1000) + candidates = [ + page.locator('div[contenteditable="true"][role="textbox"]').last, + page.locator('form textarea').last + ] + final_box = next((c for c in candidates if c.is_visible()), None) + + if final_box: + final_box.click() + final_box.type(comment_text, delay=100) + page.wait_for_timeout(500) + + post_btn = page.locator('div[role="button"]').filter(has_text="Post").last + if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last + + if post_btn.is_visible(): + post_btn.click() + status = "Success: Posted [Verified]" + else: + page.keyboard.press("Enter") + status = "Success: Enter [Verified]" + + page.screenshot(path="current_view.png") + return jsonify({"status": status, "proxy_used": proxy_str}) + + except Exception as e: + traceback.print_exc() + return jsonify({"error": str(e)}), 500 + finally: + if context: context.close() + + +@app.route('/auto_follow', methods=['POST']) +def auto_follow(): + context = None + page = None + try: + data = request.json + proxy_str = data.get('proxy') + cookies = data.get('cookies') + target_user = data.get('target_username') + + print(f"Request received for FOLLOW using proxy: {proxy_str}") + + context = create_context_with_proxy(proxy_str) + page = context.new_page() + inject_cookies(context, cookies) + + try: + page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000) + except: pass + + status = "Failed" + target_btn = None + done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"] + follow_keys = ["Follow", "Follow Back", "دنبال کردن"] + + for _ in range(20): + buttons = page.locator("header button") + if buttons.count() == 0: buttons = page.locator("button") + found = False + for i in range(buttons.count()): + btn = buttons.nth(i) + if not btn.is_visible(): continue + txt = btn.text_content().strip() + + if any(k in txt for k in done_keys): + status = f"Already: {txt}" + found = True; break + + if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: + pass + + if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt: + target_btn = btn + found = True; break + if found: break + page.wait_for_timeout(1000) + + if "Already" in status: + pass + elif target_btn: + target_btn.click(force=True) + for _ in range(15): + page.wait_for_timeout(1000) + full_text = page.locator("header").text_content() + if any(k in full_text for k in done_keys): + status = "Success: Followed [Verified]" + break + if "Success" not in status: status = "Success: Clicked (Timeout)" + + page.screenshot(path="current_view.png") + return jsonify({"status": status, "target": target_user}) + + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + if context: context.close() + +@app.route('/screenshot') +def serve_screenshot(): + if os.path.exists("current_view.png"): + return send_file("current_view.png", mimetype='image/png') + return "No image" + +# ======================================================= +# 🔄 تابع آپدیت خودکار (اضافه شده) +# ======================================================= +def auto_update_loop(): + """هر 60 ثانیه چک می‌کند اگر آپدیت آمده بود، برنامه را ریستارت می‌کند""" + repo_dir = os.path.dirname(os.path.abspath(__file__)) + while True: + try: + time.sleep(60) # هر 1 دقیقه چک کن + + # 1. Fetch + subprocess.check_call(["git", "fetch"], cwd=repo_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # 2. Compare + local_hash = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=repo_dir).strip() + remote_hash = subprocess.check_output(["git", "rev-parse", "@{u}"], cwd=repo_dir).strip() + + if local_hash != remote_hash: + print("\n[UPDATER] New version found! Updating...") + + # 3. Pull + subprocess.check_call(["git", "pull"], cwd=repo_dir) + + # 4. Install Dependencies (Optional) + req_file = os.path.join(repo_dir, "requirements.txt") + if os.path.exists(req_file): + subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) + + print("[UPDATER] Restarting application...\n") + + # 5. Restart + # بستن مرورگر قبل از ریستارت (اختیاری ولی تمیزتر) + if playwright_instance: + playwright_instance.stop() + + os.execv(sys.executable, [sys.executable] + sys.argv) + + except Exception as e: + print(f"[UPDATER ERROR]: {e}") + time.sleep(60) # اگر خطا داد، صبر کن و دوباره تلاش کن + +if __name__ == '__main__': + # --- شروع ترد آپدیت کننده در پس‌زمینه --- + update_thread = threading.Thread(target=auto_update_loop, daemon=True) + update_thread.start() + print(">>> Auto-updater started in background...") + + # حتما threaded=False باشد تا تداخل در Playwright پیش نیاید + start_browser_engine() # استارت اولیه موتور + app.run(host='0.0.0.0', port=5000, threaded=False)