Add Threading and Lock(debug)

This commit is contained in:
amirhossein 2026-01-07 08:33:10 -08:00
parent 1bf944a0cf
commit 8adc69aaa2

369
main.py
View File

@ -11,99 +11,60 @@ from playwright.sync_api import sync_playwright
app = Flask(__name__) app = Flask(__name__)
# مسیر کرومیوم
CHROMIUM_PATH = "/usr/bin/chromium-browser" CHROMIUM_PATH = "/usr/bin/chromium-browser"
if not os.path.exists(CHROMIUM_PATH): if not os.path.exists(CHROMIUM_PATH):
CHROMIUM_PATH = "/usr/bin/chromium" CHROMIUM_PATH = "/usr/bin/chromium"
# --- قفل برای صف‌بندی درخواست‌های سنگین --- # --- قفل برای صف‌بندی درخواست‌های سنگین ---
# این قفل باعث می‌شود لایک/فالو/کامنت یکی‌یکی انجام شوند # این قفل باعث می‌شود لایک/فالو/کامنت یکی‌یکی انجام شوند اما سرور برای پینگ آزاد باشد
TASK_LOCK = threading.Lock() TASK_LOCK = threading.Lock()
# مرورگر اصلی (فقط یک بار اجرا می‌شود)
playwright_instance = None
browser = None
# ======================================================= # =======================================================
# 🛠️ توابع کمکی (Helper Functions) # 🛠️ توابع کمکی (Helper Functions)
# ======================================================= # =======================================================
def start_browser_engine(): def get_browser_args():
"""موتور کرومیوم را فقط یکبار روشن می‌کند""" """تنظیمات بهینه برای اجرا در سرور"""
global playwright_instance, browser return [
if browser: return "--no-sandbox",
"--disable-dev-shm-usage",
print("--- Starting Chromium Engine ---") "--disable-blink-features=AutomationControlled",
playwright_instance = sync_playwright().start() "--disable-gpu"
browser = playwright_instance.chromium.launch( ]
executable_path=CHROMIUM_PATH,
headless=True, # در سرور واقعی True باشد
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--disable-gpu"
]
)
def parse_proxy(proxy_str): def parse_proxy(proxy_str):
"""پشتیبانی هوشمند از انواع فرمت‌های پروکسی""" """پشتیبانی هوشمند از انواع فرمت‌های پروکسی"""
if not proxy_str: return None if not proxy_str: return None
proxy_str = proxy_str.strip() proxy_str = proxy_str.strip()
# 1. اگر فرمت URL بود (مثل چیزی که فرستادی: user:pass@ip:port) # فرمت URL
if "@" in proxy_str: if "@" in proxy_str:
try: try:
# اضافه کردن http اگر نداشته باشد (برای اینکه urlparse کار کند) if not proxy_str.startswith("http"): temp_proxy = "http://" + proxy_str
if not proxy_str.startswith("http"): else: temp_proxy = proxy_str
temp_proxy = "http://" + proxy_str
else:
temp_proxy = proxy_str
parsed = urllib.parse.urlparse(temp_proxy) parsed = urllib.parse.urlparse(temp_proxy)
return {"server": f"http://{parsed.hostname}:{parsed.port}", "username": parsed.username, "password": parsed.password}
except: return None
return { # فرمت IP:PORT:USER:PASS
"server": f"http://{parsed.hostname}:{parsed.port}",
"username": parsed.username,
"password": parsed.password
}
except Exception as e:
print(f"⚠️ Proxy parse error: {e}")
return None
# 2. اگر فرمت ساده دو نقطه‌ای بود (IP:PORT:USER:PASS)
# اول پروتکل‌های احتمالی را حذف می‌کنیم
clean_str = proxy_str.replace("http://", "").replace("https://", "") clean_str = proxy_str.replace("http://", "").replace("https://", "")
parts = clean_str.split(':') parts = clean_str.split(':')
if len(parts) == 2: return {"server": f"http://{parts[0]}:{parts[1]}"}
elif len(parts) == 4: return {"server": f"http://{parts[0]}:{parts[1]}", "username": parts[2], "password": parts[3]}
return None
config = {} def create_context(browser_instance, proxy_data):
if len(parts) == 2: # IP:PORT """کانتکست را روی مرورگر لوکال (داخل تابع) می‌سازد"""
config = {"server": f"http://{parts[0]}:{parts[1]}"}
elif len(parts) == 4: # IP:PORT:USER:PASS
config = {
"server": f"http://{parts[0]}:{parts[1]}",
"username": parts[2],
"password": parts[3]
}
else:
print(f"⚠️ Invalid proxy format: {proxy_str}")
return None
return config
def create_context_with_proxy(proxy_data):
"""ساخت یک نشست (Session) جدید با پروکسی اختصاصی"""
if not browser: start_browser_engine()
proxy_config = parse_proxy(proxy_data) proxy_config = parse_proxy(proxy_data)
context = browser.new_context( context = browser_instance.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800}, viewport={"width": 1280, "height": 800},
proxy=proxy_config # <--- اعمال پروکسی اینجا انجام می‌شود proxy=proxy_config
) )
# تزریق اسکریپت Stealth # اسکریپت مخفی کردن اتوماسیون
stealth_js = """ stealth_js = """
Object.defineProperty(navigator, 'webdriver', { get: () => false }); Object.defineProperty(navigator, 'webdriver', { get: () => false });
window.chrome = { runtime: {} }; window.chrome = { runtime: {} };
@ -111,22 +72,15 @@ def create_context_with_proxy(proxy_data):
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
""" """
context.add_init_script(stealth_js) context.add_init_script(stealth_js)
return context return context
def inject_cookies(context, cookies_data): def inject_cookies(context, cookies_data):
if not cookies_data: return if not cookies_data: return
if isinstance(cookies_data, str): if isinstance(cookies_data, str): cookies_data = json.loads(cookies_data)
cookies_data = json.loads(cookies_data)
formatted_cookies = [] formatted_cookies = []
for k, v in cookies_data.items(): for k, v in cookies_data.items():
val = urllib.parse.unquote(str(v)) if k == "sessionid" and "%" in str(v) else str(v) val = urllib.parse.unquote(str(v)) if k == "sessionid" and "%" in str(v) else str(v)
formatted_cookies.append({ formatted_cookies.append({'name': k, 'value': val, 'domain': '.instagram.com', 'path': '/', 'secure': True, 'sameSite': 'Lax'})
'name': k, 'value': val,
'domain': '.instagram.com', 'path': '/',
'secure': True, 'sameSite': 'Lax'
})
context.clear_cookies() context.clear_cookies()
context.add_cookies(formatted_cookies) context.add_cookies(formatted_cookies)
@ -141,12 +95,9 @@ def health_check():
@app.route('/auto_like', methods=['POST']) @app.route('/auto_like', methods=['POST'])
def auto_like(): def auto_like():
# استفاده از قفل: اگر کسی داخل باشد، بقیه منتظر می‌مانند (صف) # اگر کسی مشغول باشد، اینجا منتظر می‌ماند (Queue)
with TASK_LOCK: with TASK_LOCK:
context = None
page = None
try: try:
# 1. دریافت داده‌ها
data = request.json data = request.json
proxy_str = data.get('proxy') proxy_str = data.get('proxy')
cookies = data.get('cookies') cookies = data.get('cookies')
@ -154,63 +105,53 @@ def auto_like():
print(f"Request received for LIKE using proxy: {proxy_str}") print(f"Request received for LIKE using proxy: {proxy_str}")
# 2. ساخت کانتکست اختصاصی # اجرای مرورگر به صورت LOCAL داخل همین ترد (رفع ارور Greenlet)
context = create_context_with_proxy(proxy_str) with sync_playwright() as p:
page = context.new_page() browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args())
# 3. تزریق کوکی و رفتن به صفحه context = create_context(browser, proxy_str)
inject_cookies(context, cookies) inject_cookies(context, cookies)
page = context.new_page()
try: try: page.goto(post_url, wait_until="networkidle", timeout=60000)
page.goto(post_url, wait_until="networkidle", timeout=60000) except: pass
except: pass
# 4. لاگیک لایک status = "Failed"
status = "Failed" target_btn = None
target_btn = None
# تشخیص وضعیت # لاگیک لایک
for _ in range(20): for _ in range(20):
if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0: if page.locator('svg[aria-label="Unlike"]').count() > 0 or page.locator('svg[aria-label="نپسندیدن"]').count() > 0:
status = "Already Liked" status = "Already Liked"; break
break likes = page.locator('svg[aria-label="Like"]')
if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]')
if likes.count() > 0 and likes.first.is_visible():
target_btn = likes.first; break
page.wait_for_timeout(1000)
likes = page.locator('svg[aria-label="Like"]') if status == "Already Liked": pass
if likes.count() == 0: likes = page.locator('svg[aria-label="پسندیدن"]') elif target_btn:
target_btn.click(force=True)
if likes.count() > 0 and likes.first.is_visible(): page.wait_for_timeout(2000)
target_btn = likes.first if page.locator('svg[aria-label="Unlike"]').count() > 0:
break status = "Success: Liked [Verified]"
page.wait_for_timeout(1000) else:
status = "Success: Clicked (No Verify)"
if status == "Already Liked":
pass
elif target_btn:
target_btn.click(force=True)
page.wait_for_timeout(2000)
if page.locator('svg[aria-label="Unlike"]').count() > 0:
status = "Success: Liked [Verified]"
else: else:
status = "Success: Clicked (No Verify)" page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2)
else: status = "Success: DoubleTap"
page.mouse.dblclick(page.viewport_size['width']/2, page.viewport_size['height']/2)
status = "Success: DoubleTap"
page.screenshot(path="current_view.png") page.screenshot(path="current_view.png")
return jsonify({"status": status, "proxy_used": proxy_str}) browser.close() # بستن مرورگر در پایان کار
return jsonify({"status": status, "proxy_used": proxy_str})
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
finally:
if context: context.close()
@app.route('/auto_comment', methods=['POST']) @app.route('/auto_comment', methods=['POST'])
def auto_comment(): def auto_comment():
with TASK_LOCK: with TASK_LOCK:
context = None
page = None
try: try:
data = request.json data = request.json
proxy_str = data.get('proxy') proxy_str = data.get('proxy')
@ -220,79 +161,73 @@ def auto_comment():
print(f"Request received for COMMENT using proxy: {proxy_str}") print(f"Request received for COMMENT using proxy: {proxy_str}")
context = create_context_with_proxy(proxy_str) with sync_playwright() as p:
page = context.new_page() browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args())
inject_cookies(context, cookies)
try: context = create_context(browser, proxy_str)
page.goto(post_url, wait_until="networkidle", timeout=60000) inject_cookies(context, cookies)
except: pass page = context.new_page()
if page.locator("input[name='username']").count() > 0: try: page.goto(post_url, wait_until="networkidle", timeout=60000)
return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401 except: pass
# لاگیک کامنت if page.locator("input[name='username']").count() > 0:
bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']] browser.close()
bubbles.append(page.get_by_role("button").filter(has_text="Comment")) return jsonify({"status": "Failed", "reason": "Login Redirect"}), 401
for b in bubbles: # لاگیک کامنت
if b.count() > 0 and b.first.is_visible(): bubbles = [page.locator(s) for s in ['svg[aria-label="Comment"]', 'svg[aria-label="محاوره"]', 'svg[class*="_ab6-"]']]
b.first.click(force=True) bubbles.append(page.get_by_role("button").filter(has_text="Comment"))
break
input_ready = False for b in bubbles:
for _ in range(30): if b.count() > 0 and b.first.is_visible():
placeholders = page.get_by_text("Add a comment...", exact=False) b.first.click(force=True); break
if placeholders.count() > 0 and placeholders.last.is_visible():
placeholders.last.click(force=True)
input_ready = True
break
forms = page.locator("form")
if forms.count() > 0 and forms.last.is_visible():
forms.last.click(force=True)
input_ready = True
break
page.wait_for_timeout(1000)
status = "Failed" input_ready = False
if input_ready: for _ in range(30):
page.wait_for_timeout(1000) placeholders = page.get_by_text("Add a comment...", exact=False)
candidates = [ if placeholders.count() > 0 and placeholders.last.is_visible():
page.locator('div[contenteditable="true"][role="textbox"]').last, placeholders.last.click(force=True); input_ready = True; break
page.locator('form textarea').last forms = page.locator("form")
] if forms.count() > 0 and forms.last.is_visible():
final_box = next((c for c in candidates if c.is_visible()), None) forms.last.click(force=True); input_ready = True; break
page.wait_for_timeout(1000)
if final_box: status = "Failed"
final_box.click() if input_ready:
final_box.type(comment_text, delay=100) page.wait_for_timeout(1000)
page.wait_for_timeout(500) candidates = [
page.locator('div[contenteditable="true"][role="textbox"]').last,
page.locator('form textarea').last
]
final_box = next((c for c in candidates if c.is_visible()), None)
post_btn = page.locator('div[role="button"]').filter(has_text="Post").last if final_box:
if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last final_box.click()
final_box.type(comment_text, delay=100)
page.wait_for_timeout(500)
if post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="Post").last
post_btn.click() if not post_btn.is_visible(): post_btn = page.locator('div[role="button"]').filter(has_text="ارسال").last
status = "Success: Posted [Verified]"
else:
page.keyboard.press("Enter")
status = "Success: Enter [Verified]"
page.screenshot(path="current_view.png") if post_btn.is_visible():
return jsonify({"status": status, "proxy_used": proxy_str}) post_btn.click()
status = "Success: Posted [Verified]"
else:
page.keyboard.press("Enter")
status = "Success: Enter [Verified]"
page.screenshot(path="current_view.png")
browser.close()
return jsonify({"status": status, "proxy_used": proxy_str})
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
finally:
if context: context.close()
@app.route('/auto_follow', methods=['POST']) @app.route('/auto_follow', methods=['POST'])
def auto_follow(): def auto_follow():
with TASK_LOCK: with TASK_LOCK:
context = None
page = None
try: try:
data = request.json data = request.json
proxy_str = data.get('proxy') proxy_str = data.get('proxy')
@ -301,64 +236,60 @@ def auto_follow():
print(f"Request received for FOLLOW using proxy: {proxy_str}") print(f"Request received for FOLLOW using proxy: {proxy_str}")
context = create_context_with_proxy(proxy_str) with sync_playwright() as p:
page = context.new_page() browser = p.chromium.launch(executable_path=CHROMIUM_PATH, headless=True, args=get_browser_args())
inject_cookies(context, cookies)
try: context = create_context(browser, proxy_str)
page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000) inject_cookies(context, cookies)
except: pass page = context.new_page()
status = "Failed" try: page.goto(f"https://www.instagram.com/{target_user}/", wait_until="networkidle", timeout=60000)
target_btn = None except: pass
done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"]
follow_keys = ["Follow", "Follow Back", "دنبال کردن"]
for _ in range(20): status = "Failed"
buttons = page.locator("header button") target_btn = None
if buttons.count() == 0: buttons = page.locator("button") done_keys = ["Following", "Requested", "دنبال شد", "درخواست شد"]
found = False follow_keys = ["Follow", "Follow Back", "دنبال کردن"]
for i in range(buttons.count()):
btn = buttons.nth(i)
if not btn.is_visible(): continue
txt = btn.text_content().strip()
if any(k in txt for k in done_keys): # لاگیک فالو
status = f"Already: {txt}" for _ in range(20):
found = True; break buttons = page.locator("header button")
if buttons.count() == 0: buttons = page.locator("button")
found = False
for i in range(buttons.count()):
btn = buttons.nth(i)
if not btn.is_visible(): continue
txt = btn.text_content().strip()
if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: if any(k in txt for k in done_keys):
pass status = f"Already: {txt}"; found = True; break
if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt: if ("Message" in txt or "پیام" in txt) and page.get_by_text("Follow", exact=True).count() == 0: pass
target_btn = btn
found = True; break
if found: break
page.wait_for_timeout(1000)
if "Already" in status: if any(k == txt or k in txt for k in follow_keys) and "Following" not in txt:
pass target_btn = btn; found = True; break
elif target_btn: if found: break
target_btn.click(force=True)
for _ in range(15):
page.wait_for_timeout(1000) page.wait_for_timeout(1000)
full_text = page.locator("header").text_content()
if any(k in full_text for k in done_keys):
status = "Success: Followed [Verified]"
break
if "Success" not in status: status = "Success: Clicked (Timeout)"
page.screenshot(path="current_view.png") if "Already" in status: pass
return jsonify({"status": status, "target": target_user}) elif target_btn:
target_btn.click(force=True)
for _ in range(15):
page.wait_for_timeout(1000)
full_text = page.locator("header").text_content()
if any(k in full_text for k in done_keys):
status = "Success: Followed [Verified]"; break
if "Success" not in status: status = "Success: Clicked (Timeout)"
page.screenshot(path="current_view.png")
browser.close()
return jsonify({"status": status, "target": target_user})
except Exception as e: except Exception as e:
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
finally:
if context: context.close()
@app.route('/screenshot') @app.route('/screenshot')
def serve_screenshot(): def serve_screenshot():
# اسکرین شات پشت قفل نمی‌ماند
if os.path.exists("current_view.png"): if os.path.exists("current_view.png"):
return send_file("current_view.png", mimetype='image/png') return send_file("current_view.png", mimetype='image/png')
return "No image" return "No image"
@ -394,9 +325,7 @@ def auto_update_loop():
print("[UPDATER] Restarting application...\n") print("[UPDATER] Restarting application...\n")
# 5. Restart # 5. Restart
if playwright_instance: # اینجا دیگر نیازی به بستن دستی مرورگر نیست چون در هر درخواست بسته می‌شود
playwright_instance.stop()
os.execv(sys.executable, [sys.executable] + sys.argv) os.execv(sys.executable, [sys.executable] + sys.argv)
except Exception as e: except Exception as e:
@ -409,7 +338,7 @@ if __name__ == '__main__':
update_thread.start() update_thread.start()
print(">>> Auto-updater started in background...") print(">>> Auto-updater started in background...")
start_browser_engine() # استارت اولیه موتور # در اینجا دیگر استارت اولیه موتور را نداریم چون داخل توابع انجام می‌شود
# تغییر مهم: threaded=True فعال شد # تغییر مهم: threaded=True فعال شد
# درخواست‌های همزمان پذیرفته می‌شوند، اما توابع سنگین با TASK_LOCK صف‌بندی می‌شوند # درخواست‌های همزمان پذیرفته می‌شوند، اما توابع سنگین با TASK_LOCK صف‌بندی می‌شوند