diff --git a/OnlySnap.py b/OnlySnap.py index 6b22ae1..fabefb5 100644 --- a/OnlySnap.py +++ b/OnlySnap.py @@ -37,7 +37,7 @@ DMR_DIR = os.path.join(BASE_DIR, "dmr") #logs -DEBUG_MODE = False +DEBUG_MODE = False DEBUG_FILE = os.path.join(DMR_DIR, "debug.log") CURRENT_VERSION = "1.0.5" @@ -89,22 +89,22 @@ def run_mass_download(self, username, progress_callback): try: clean_username = username.replace("@", "").strip() config = load_config() - + # Settings disable_cover_highlights = config['settings']['disable_cover_highlights'] disable_download_txt = config['settings']['disable_download_post_with_txt'] download_tagged = config['settings']['download_tagged_posts'] download_labels_option = config['settings'].get('download_labels', False) merge_tagged = config['settings']['merge_tagged_media'] - + PROFILE_INFO = get_user_info(clean_username) PROFILE = PROFILE_INFO['username'] PROFILE_ID = str(PROFILE_INFO["id"]) new_files = 0 - + assure_dir("Profiles/" + PROFILE) assure_dir("Profiles/" + PROFILE + "/Public") - + # Dump info with clean raw_about = PROFILE_INFO.get("about") or "" clean_about = re.sub(r'<[^>]+>', ' ', raw_about) @@ -120,11 +120,11 @@ def run_mass_download(self, username, progress_callback): "website": PROFILE_INFO.get("website"), "location": PROFILE_INFO.get("location") } - + if sinf.get("joinDate"): - try: + try: sinf["joinDate"] = datetime.datetime.strptime(sinf["joinDate"], "%Y-%m-%dT%H:%M:%S+00:00").strftime("%Y-%m-%d") - except: + except: pass sinf = {k: v for k, v in sinf.items() if v is not None} @@ -133,7 +133,7 @@ def run_mass_download(self, username, progress_callback): check_and_update_profile_cache(PROFILE_ID) download_public_files() - count_public = new_files + count_public = new_files stories_list = [] if not self.stop_requested: @@ -156,12 +156,12 @@ def run_mass_download(self, username, progress_callback): # Posts Retrieval photo_posts = read_from_cache(PROFILE_ID, "photos") or [] - if not photo_posts: + if not photo_posts: raw = api_request(f"/users/{PROFILE_ID}/posts/photos", getdata={"limit": "999999"}) if isinstance(raw, dict) and 'list' in raw: raw = raw['list'] photo_posts = get_all_photos(raw) update_profile_cache(PROFILE_ID, "photos", photo_posts) - + video_posts = read_from_cache(PROFILE_ID, "videos") or [] if not video_posts: raw = api_request(f"/users/{PROFILE_ID}/posts/videos", getdata={"limit": "999999"}) @@ -175,18 +175,18 @@ def run_mass_download(self, username, progress_callback): if isinstance(raw, dict) and 'list' in raw: raw = raw['list'] archived_posts = get_all_archived(raw) update_profile_cache(PROFILE_ID, "archived", archived_posts) - + stream_posts = read_from_cache(PROFILE_ID, "streams") or [] if not stream_posts: raw = api_request(f"/users/{PROFILE_ID}/posts/streams", getdata={"limit": "999999"}) - if isinstance(raw, dict) and 'list' in raw: raw = raw['list'] + if isinstance(raw, dict) and 'list' in raw: raw = raw['list'] stream_posts = get_all_streams(raw) update_profile_cache(PROFILE_ID, "streams", stream_posts) seen_post_ids = set() unique_counts = { 'archived': 0, 'stream': 0, 'video': 0, 'photo': 0 } skipped_ads_count = 0 - final_download_list = [] + final_download_list = [] prioritized_lists = [ (archived_posts, 'archived'), @@ -202,21 +202,21 @@ def run_mass_download(self, username, progress_callback): if not post.get("canViewMedia", True): continue pid = str(post.get("id")) - if pid in seen_post_ids: continue - + if pid in seen_post_ids: continue + text = post.get("text") or "" tags = ["#adv", "#ad", "#advertising", "#ad24", "#ads", "spin", "#Advertisement"] is_spam = any(tag in (text.lower() if text else "") for tag in tags) - + if is_spam and not download_tagged: skipped_ads_count += 1 - seen_post_ids.add(pid) + seen_post_ids.add(pid) continue media_count = len(post.get("media", [])) if media_count > 0: - seen_post_ids.add(pid) - unique_counts[category] += media_count + seen_post_ids.add(pid) + unique_counts[category] += media_count is_arch = (category == 'archived') is_str = (category == 'stream') final_download_list.append((post, is_arch, is_str)) @@ -224,7 +224,7 @@ def run_mass_download(self, username, progress_callback): c_stories = 0 for s in stories_list: if s.get("canView", True): c_stories += len(s.get('media', [])) - + c_chats = 0 for c in chats_list: if c.get("canView", True): c_chats += len(c.get('media', [])) @@ -232,7 +232,7 @@ def run_mass_download(self, username, progress_callback): c_highlights_files = 0 c_highlights_covers = 0 if not disable_cover_highlights: c_highlights_covers = len(highlights_list) - + if highlights_list: self.log(f"...Analyzing Highlights (Please wait)...") for h_folder in highlights_list: @@ -243,7 +243,7 @@ def run_mass_download(self, username, progress_callback): if isinstance(details, dict): h_stories = details.get("stories", []) for s in h_stories: - if s.get("canView", True): + if s.get("canView", True): c_highlights_files += len(s.get("media", [])) except: pass @@ -251,7 +251,7 @@ def run_mass_download(self, username, progress_callback): total_global_files = total_post_files + c_stories + c_chats + c_highlights_files + c_highlights_covers + count_public if self.clear_log: self.clear_log() - + if total_global_files == 0: self.log(f"Profile @{PROFILE} has no content available.") progress_callback(100, 100, "No content") @@ -264,7 +264,7 @@ def run_mass_download(self, username, progress_callback): if unique_counts['archived'] > 0: self.log(f"ARCHIVED: {unique_counts['archived']}") if c_stories > 0: self.log(f"STORIES: {c_stories}") if c_chats > 0: self.log(f"MESSAGES: {c_chats}") - + if c_highlights_files > 0 or c_highlights_covers > 0: hl_msg = f"HIGHLIGHTS: {c_highlights_files}" if c_highlights_covers > 0: hl_msg += f" (+ {c_highlights_covers} Covers)" @@ -276,17 +276,17 @@ def run_mass_download(self, username, progress_callback): self.log(f"Found {total_global_files} total files.") if skipped_ads_count > 0: self.log(f"Skipped {skipped_ads_count} SPAM/AD posts.") self.log("--------------------------------") - + if download_labels_option: - self.log(f"--> Starting Labels...") + self.log(f"--> Starting Labels...") else: - self.log(f"--> Starting download sync...") + self.log(f"--> Starting download sync...") if not disable_download_txt: assure_dir(f"Profiles/{PROFILE}/Media") self.current_file_progress = count_public - + def on_file_dl(): self.current_file_progress += 1 curr = min(self.current_file_progress, total_global_files) @@ -294,11 +294,11 @@ def on_file_dl(): if count_public > 0: progress_callback(count_public, total_global_files, f"Downloading @{PROFILE}") - label_map = {} - + label_map = {} + if download_labels_option and not self.stop_requested: labels_list = get_all_labels() - + if labels_list: for label in labels_list: if self.stop_requested: break @@ -317,7 +317,7 @@ def on_file_dl(): for m in lp.get("media", []): if m['type'] == 'photo': has_pic = True elif m['type'] in ['video', 'gif']: has_vid = True - + is_mixed = has_vid and has_pic base_label_path = f"Profiles/{PROFILE}/Media/Labels/{lname}/" @@ -328,6 +328,8 @@ def on_file_dl(): "mixed": is_mixed } + drm_skipped_count = 0 + with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: futures = [] @@ -341,7 +343,7 @@ def on_file_dl(): source_url = None files = media.get("files", {}) cf_cookies = None - + if "drm" in files and files["drm"]: source_url = files["drm"].get("manifest", {}).get("dash") signature = files["drm"].get("signature", {}).get("dash", {}) @@ -349,23 +351,23 @@ def on_file_dl(): p = signature.get("CloudFront-Policy") s = signature.get("CloudFront-Signature") k = signature.get("CloudFront-Key-Pair-Id") - if p and s and k: + if p and s and k: cf_cookies = f"CloudFront-Policy={p}; CloudFront-Signature={s}; CloudFront-Key-Pair-Id={k}" - + if not source_url: source_url = files.get("full", {}).get("url") or files.get("preview", {}).get("url") - + if source_url: p = cv if media['type'] in ['video', 'gif'] else cp futures.append(executor.submit( - download_media, - media, - False, - path=p, + download_media, + media, + False, + path=p, source_url=source_url, - post_id=chat_id, + post_id=chat_id, specific_cookies=cf_cookies, - is_chat=True + is_chat=True )) # B. STORIES @@ -376,31 +378,34 @@ def on_file_dl(): if src: futures.append(executor.submit(download_media, media, False, path=f"Profiles/{PROFILE}/Media/Stories/", source_url=src)) - # C. HIGHLIGHTS + # C. HIGHLIGHTS if highlights_list: download_highlights({"list": highlights_list}, file_callback=on_file_dl) # D. POSTS (STANDARD + LABEL OVERRIDE) for post, is_arch, is_str in final_download_list: - if self.stop_requested: break - + if self.stop_requested: + break + pid = str(post.get("id")) text = post.get("text") or "" - + post_ts_unix = float(post.get("postedAtPrecise", time.time())) post_date = dt.datetime.fromtimestamp(post_ts_unix) post_date_str = post_date.strftime('%Y-%m-%dT%H_%M_%S') - contains_tags = any(tag in (text.lower()) for tag in ["#adv", "#ad", "spin", "Advertisement"]) + contains_tags = any(tag in (text.lower()) for tag in ["#adv", "#ad", "spin", "Advertisement"]) path_override = None is_in_label = False # Check Tagged if download_tagged and contains_tags: base_tag = f"Profiles/{PROFILE}/Media/Tag-Post" - if merge_tagged: path_override = f"/{base_tag}/" - else: path_override = "TAG_SPLIT" - + if merge_tagged: + path_override = f"/{base_tag}/" + else: + path_override = "TAG_SPLIT" + # Check Label (if active via settings) elif pid in label_map: is_in_label = True @@ -416,27 +421,31 @@ def on_file_dl(): if is_in_label: txt_dir = label_map[pid]['base'] elif path_override == "TAG_SPLIT" or (download_tagged and contains_tags): - txt_dir = f"Profiles/{PROFILE}/Media/Posts/{post_date_str}" - elif path_override is None: # Standard Post - txt_dir = f"Profiles/{PROFILE}/Media/Posts/{post_date_str}" - + txt_dir = f"Profiles/{PROFILE}/Media/Posts/{post_date_str}" + elif path_override is None: + txt_dir = f"Profiles/{PROFILE}/Media/Posts/{post_date_str}" + if txt_dir: assure_dir(txt_dir) - with open(f"{txt_dir}/_text.txt", "w", encoding='utf-8') as f: f.write(text) + with open(f"{txt_dir}/_text.txt", "w", encoding='utf-8') as f: + f.write(text) for media in post.get("media", []): current_media_path = None - + if path_override == "TAG_SPLIT": current_media_path = f"/Profiles/{PROFILE}/Media/Tag-Post/{media['type']}s/" elif path_override and path_override.startswith("LABEL_MIXED|"): real_base = path_override.split("|")[1] - if media['type'] == 'photo': current_media_path = real_base + "Photos/" - elif media['type'] in ['video', 'gif']: current_media_path = real_base + "Videos/" - else: current_media_path = real_base + if media['type'] == 'photo': + current_media_path = real_base + "Photos/" + elif media['type'] in ['video', 'gif']: + current_media_path = real_base + "Videos/" + else: + current_media_path = real_base else: - current_media_path = path_override - # URL & DRM + current_media_path = path_override + source_url = None files = media.get("files", {}) cf_cookies = None @@ -447,34 +456,34 @@ def on_file_dl(): p = signature.get("CloudFront-Policy") s = signature.get("CloudFront-Signature") k = signature.get("CloudFront-Key-Pair-Id") - if p and s and k: cf_cookies = f"CloudFront-Policy={p}; CloudFront-Signature={s}; CloudFront-Key-Pair-Id={k}" + if p and s and k: + cf_cookies = f"CloudFront-Policy={p}; CloudFront-Signature={s}; CloudFront-Key-Pair-Id={k}" if not source_url: source_url = files.get("source", {}).get("url") or files.get("full", {}).get("url") if source_url: futures.append(executor.submit( - download_media, - media, - is_arch, + download_media, + media, + is_arch, current_media_path, - post_date, - is_str, - source_url, - pid, + post_date, + is_str, + source_url, + pid, cf_cookies )) - drm_skipped_count = 0 - - for f in as_completed(futures): - try: - result = f.result() - if result == "DRM_FAILED": - drm_skipped_count += 1 - elif result == True: - new_files += 1 - on_file_dl() - except: pass + for f in as_completed(futures): + try: + result = f.result() + if result == "DRM_FAILED": + drm_skipped_count += 1 + elif result: + new_files += 1 + on_file_dl() + except: + pass # --- FINISH --- progress_callback(total_global_files, total_global_files, "Completed") @@ -487,19 +496,18 @@ def on_file_dl(): self.log("SYNC COMPLETED.") self.log(f"- Total files scanned: {total_global_files}") self.log(f"- New files downloaded: {new_files}") - + # --- AGGIUNGI IL MESSAGGIO QUI --- if drm_skipped_count > 0: self.log(f"⚠️ SKIPPED {drm_skipped_count} DRM VIDEOS.") self.log("Key server is busy or offline. They will be downloaded next time.") - + self.log("------------------------------------------------") except Exception as e: self.log(f"CRITICAL ERROR: {str(e)}") traceback.print_exc() - class SettingsScreen(Screen): BINDINGS = [("escape", "cancel", "Close")] CSS = """ @@ -614,6 +622,273 @@ def esegui_auto_save(self): except Exception as e: pass + +class ConfirmReplaceAuthScreen(Screen): + BINDINGS = [("escape", "cancel", "Cancel")] + CSS = """ + ConfirmReplaceAuthScreen { + align: center middle; + background: rgba(0,0,0,0.7); + } + + #confirm_container { + width: 70; + height: auto; + background: #1f1d2e; + border: heavy #ebbcba; + padding: 2; + } + + #confirm_buttons { + height: auto; + margin-top: 1; + } + + #confirm_buttons > Button { + width: 1fr; + margin-right: 1; + } + + #confirm_yes { + background: #eb6f92; + color: #e0def4; + } + + #confirm_no { + background: #31748f; + color: #e0def4; + margin-right: 0; + } + """ + + def compose(self) -> ComposeResult: + with Container(id="confirm_container"): + yield Label("Saved cookies were detected.", classes="info_sub") + yield Label("Do you want to replace the saved session with a new login?") + with Horizontal(id="confirm_buttons"): + yield Button("YES, REPLACE", id="confirm_yes") + yield Button("NO", id="confirm_no") + + @on(Button.Pressed, "#confirm_yes") + def confirm_yes(self): + try: + clear_saved_auth() + except Exception: + pass + self.app.pop_screen() + if hasattr(self.app, "start_login_flow"): + self.app.start_login_flow() + + @on(Button.Pressed, "#confirm_no") + def confirm_no(self): + self.app.pop_screen() + if hasattr(self.app, "cancel_login_flow"): + self.app.cancel_login_flow() + + def action_cancel(self): + self.confirm_no() + + +def check_missing_deps(): + """Retorna lista de dependencias faltantes con info para descargarlas.""" + missing = [] + + # N_m3u8DL-RE + if not os.path.isfile(local_downloader): + if system == "Windows": + dl_url = "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.5.1-beta/N_m3u8DL-RE_v0.5.1-beta_win-x64_20251029.zip" + dl_type = "zip" + elif system == "Darwin": + dl_url = "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.5.1-beta/N_m3u8DL-RE_v0.5.1-beta_osx-x64_20251029.tar.gz" + dl_type = "tar" + else: + dl_url = "https://github.com/nilaoda/N_m3u8DL-RE/releases/download/v0.5.1-beta/N_m3u8DL-RE_v0.5.1-beta_linux-x64_20251029.tar.gz" + dl_type = "tar" + missing.append({"name": "N_m3u8DL-RE", "dest": local_downloader, "url": dl_url, "type": dl_type}) + + # mp4decrypt + if not os.path.isfile(local_mp4decrypt): + if system == "Windows": + dl_url = "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-641.x86_64-microsoft-win32.zip" + bin_name = "mp4decrypt.exe" + elif system == "Darwin": + dl_url = "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-641.universal-apple-macosx.zip" + bin_name = "mp4decrypt" + else: + dl_url = "https://www.bok.net/Bento4/binaries/Bento4-SDK-1-6-0-641.x86_64-unknown-linux.zip" + bin_name = "mp4decrypt" + missing.append({"name": "mp4decrypt", "dest": local_mp4decrypt, "url": dl_url, "type": "zip_bento4", "bin_name": bin_name}) + + # ffmpeg + if not os.path.isfile(local_ffmpeg) and not shutil.which("ffmpeg"): + missing.append({"name": "ffmpeg", "dest": None, "url": None, "type": "system"}) + + return missing + + +def install_dep(dep_info, log_func=None): + """Descarga e instala una dependencia en DMR_DIR.""" + os.makedirs(DMR_DIR, exist_ok=True) + dep_name = dep_info["name"] + dep_url = dep_info.get("url") + dep_dest = dep_info.get("dest") + dep_type = dep_info.get("type") + + if not dep_url or not dep_dest: + return False + + try: + if log_func: log_func(f"Downloading {dep_name}...") + zip_path = os.path.join(DMR_DIR, f"_tmp_{dep_name}.download") + response = requests.get(dep_url, stream=True, timeout=60) + response.raise_for_status() + with open(zip_path, "wb") as zip_file: + for chunk in response.iter_content(chunk_size=8192): + zip_file.write(chunk) + + if log_func: log_func(f"Extracting {dep_name}...") + + if dep_type == "zip": + import zipfile + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + for zip_member in zip_ref.namelist(): + if os.path.basename(zip_member) == os.path.basename(dep_dest): + with zip_ref.open(zip_member) as src, open(dep_dest, 'wb') as dst: + dst.write(src.read()) + break + + elif dep_type == "tar": + import tarfile + with tarfile.open(zip_path, 'r:gz') as tar_ref: + for tar_member in tar_ref.getmembers(): + if os.path.basename(tar_member.name) == os.path.basename(dep_dest): + extracted = tar_ref.extractfile(tar_member) + if extracted: + with open(dep_dest, 'wb') as dst: + dst.write(extracted.read()) + break + + elif dep_type == "zip_bento4": + import zipfile + bin_name = dep_info.get("bin_name", os.path.basename(dep_dest)) + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + for zip_member in zip_ref.namelist(): + if os.path.basename(zip_member) == bin_name: + with zip_ref.open(zip_member) as src, open(dep_dest, 'wb') as dst: + dst.write(src.read()) + break + + os.remove(zip_path) + + if system != "Windows" and dep_dest and os.path.isfile(dep_dest): + os.chmod(dep_dest, 0o755) + + if log_func: log_func(f"{dep_name} installed.") + return True + + except Exception as install_error: + if log_func: log_func(f"Failed to install {dep_name}: {install_error}") + try: + if os.path.exists(zip_path): os.remove(zip_path) + except Exception: + pass + return False + + +class MissingDepsScreen(Screen): + BINDINGS = [("escape", "ignore_all", "Ignore")] + CSS = """ + MissingDepsScreen { + align: center middle; + background: rgba(0,0,0,0.75); + } + #deps_container { + width: 70; + height: auto; + background: #1f1d2e; + border: heavy #ebbcba; + padding: 2; + } + #deps_title { + text-style: bold; + color: #ebbcba; + margin-bottom: 1; + } + #deps_list { + margin-bottom: 1; + color: #e0def4; + } + #deps_buttons { + height: auto; + margin-top: 1; + } + #deps_buttons > Button { + width: 1fr; + margin-right: 1; + } + #btn_install_deps { + background: #eb6f92; + color: #e0def4; + } + #btn_ignore_deps { + background: #31748f; + color: #e0def4; + margin-right: 0; + } + """ + + def __init__(self, missing_deps): + super().__init__() + self.missing_deps = missing_deps + + def compose(self) -> ComposeResult: + dep_names = ", ".join(d["name"] for d in self.missing_deps) + with Container(id="deps_container"): + yield Label("Missing Dependencies", id="deps_title") + yield Label(f"The following tools were not found:\n{dep_names}", id="deps_list") + yield Label("Install them automatically?") + with Horizontal(id="deps_buttons"): + yield Button("INSTALL", id="btn_install_deps") + yield Button("IGNORE", id="btn_ignore_deps") + + @on(Button.Pressed, "#btn_install_deps") + def install_all(self): + app_ref = self.app + missing_deps_copy = list(self.missing_deps) + app_ref.pop_screen() + + def run_installs(): + app_ref.call_from_thread(app_ref.log_msg, "Installing missing dependencies in background...") + for dep_info in missing_deps_copy: + dep_name = dep_info["name"] + if dep_info.get("type") == "system": + app_ref.call_from_thread( + app_ref.log_msg, + f"[{dep_name}] Cannot auto-install — run: sudo apt install ffmpeg" + ) + continue + app_ref.call_from_thread(app_ref.log_msg, f"[{dep_name}] Downloading...") + success = install_dep( + dep_info, + log_func=lambda msg: app_ref.call_from_thread(app_ref.log_msg, msg) + ) + if success: + app_ref.call_from_thread(app_ref.log_msg, f"[{dep_name}] Installed successfully.") + else: + app_ref.call_from_thread(app_ref.log_msg, f"[{dep_name}] Installation failed.") + app_ref.call_from_thread(app_ref.log_msg, "Dependency installation complete.") + + install_thread = threading.Thread(target=run_installs, daemon=True) + install_thread.start() + + @on(Button.Pressed, "#btn_ignore_deps") + def ignore_all(self): + self.app.pop_screen() + + def action_ignore_all(self): + self.ignore_all() + + class OnlySnapTUI(App): CSS = """ Screen { @@ -657,6 +932,7 @@ class OnlySnapTUI(App): #buttons_container > Button { width: 1fr; margin-right: 1; } #btn_refresh { width: 100%; background: #c4a7e7; color: #191724; margin-top: 1; } + #btn_login { width: 100%; background: #f6c177; color: #191724; margin-top: 1; } #btn_dl { background: #31748f; color: #e0def4; } #btn_stop { background: #eb6f92; color: #e0def4; margin-right: 0; } @@ -684,6 +960,7 @@ def compose(self) -> ComposeResult: yield Select([("All", "all"), ("Paid", "Paid"), ("Free", "Free"), ("Trial", "Trial")], value="all", id="filter_type") yield DataTable(id="users_table") yield Button("Refresh List", id="btn_refresh") + yield Button("Login", id="btn_login") with Horizontal(id="bottom_buttons"): yield Button("Settings", id="btn_settings") @@ -710,9 +987,19 @@ def on_mount(self): table = self.query_one(DataTable) table.cursor_type = "row" table.add_columns("Username", "Type") - self.refresh_list() + + if refresh_api_header_from_disk(): + self.refresh_list() + else: + self.query_one("#lbl_status").update("Status: No session - click Login") + self.log_msg("No valid session found. Click Login to sign in.") + self.run_worker(self.check_updates, thread=True) + missing_deps = check_missing_deps() + if missing_deps: + self.push_screen(MissingDepsScreen(missing_deps)) + def log_msg(self, text): try: log_widget = self.query_one(Log) @@ -727,6 +1014,11 @@ def clear_log_console(self): @work(exclusive=True) async def refresh_list(self): + if not refresh_api_header_from_disk(): + self.log_msg("No valid session. Click Login first.") + self.query_one("#lbl_status").update("Status: No session - click Login") + return + self.log_msg("Syncing subscriptions...") try: cached_data = load_from_cache() @@ -743,9 +1035,10 @@ async def refresh_list(self): self.log_msg(f"Synced {len(self.all_subs)} creators (Cache Updated).") else: self.log_msg(f"Synced {len(self.all_subs)} creators.") - - except Exception as e: - self.log_msg("Update error:Please Update Cookies") + + self.query_one("#lbl_status").update("Status: Ready") + except Exception: + self.log_msg("Update error:Please Login again") def update_table(self): table = self.query_one(DataTable) @@ -799,10 +1092,16 @@ def user_selected(self, event): @on(Button.Pressed, "#btn_dl") def start_dl(self): + if not refresh_api_header_from_disk(): + self.log_msg("Login required before downloading.") + self.query_one("#lbl_status").update("Status: No session - click Login") + return + self.query_one(Log).clear() self.query_one("#btn_dl").disabled = True self.query_one("#btn_stop").disabled = False self.query_one("#btn_refresh").disabled = True + self.query_one("#btn_login").disabled = True self.query_one("#search_input").disabled = True self.query_one("#filter_type").disabled = True self.query_one("#users_table").disabled = True @@ -817,6 +1116,60 @@ async def action_refresh(self): self.query_one(Log).clear() self.refresh_list() + def start_login_flow(self): + self.query_one("#btn_login").disabled = True + self.query_one("#lbl_status").update("Status: Login window...") + self.log_msg("Opening Chrome/Chromium login window...") + self.run_worker(self.login_task, thread=True) + + def cancel_login_flow(self): + self.query_one("#btn_login").disabled = False + self.log_msg("Login cancelled.") + if has_valid_auth(): + self.query_one("#lbl_status").update("Status: Ready") + else: + self.query_one("#lbl_status").update("Status: No session - click Login") + + @on(Button.Pressed, "#btn_login") + def action_login(self): + if has_saved_auth_cookies(): + self.query_one("#btn_login").disabled = True + self.push_screen(ConfirmReplaceAuthScreen()) + return + self.start_login_flow() + + def login_task(self): + cmd = [sys.executable, os.path.abspath(__file__), "--login-chrome"] + try: + proc = subprocess.run(cmd, capture_output=True, text=True) + out = (proc.stdout or "").strip() + err = (proc.stderr or "").strip() + self.call_from_thread(self.after_login_task, proc.returncode, out, err) + except Exception as e: + self.call_from_thread(self.after_login_task, 1, "", str(e)) + + def after_login_task(self, code, out, err): + self.query_one("#btn_login").disabled = False + + if out: + for line in out.splitlines()[-8:]: + self.log_msg(line) + if err: + for line in err.splitlines()[-8:]: + self.log_msg(line) + + if code == 0 and refresh_api_header_from_disk(): + check_and_clear_cache_if_user_id_changed() + self.query_one("#lbl_status").update("Status: Logged in") + self.log_msg("Session saved successfully.") + self.refresh_list() + elif code == 0: + self.query_one("#lbl_status").update("Status: Browser closed") + self.log_msg("Chrome/Chromium login mode opened correctly, but this mode does not auto-update Auth.json.") + else: + self.query_one("#lbl_status").update("Status: Login failed") + self.log_msg("Login window closed without a valid session.") + @on(Button.Pressed, "#btn_settings") def open_settings(self): self.push_screen(SettingsScreen()) @@ -854,6 +1207,7 @@ def reset_ui(self): # Re-enable interaction self.query_one("#btn_refresh").disabled = False + self.query_one("#btn_login").disabled = False self.query_one("#search_input").disabled = False self.query_one("#filter_type").disabled = False self.query_one("#users_table").disabled = False @@ -894,6 +1248,8 @@ def request_stop(self): # profile data from /users/ PROFILE_INFO = {} PROFILE_ID = "" +API_HEADER = {} +dynamic_rules = {} def clean_up_empty_folder(folder_path): try: @@ -907,16 +1263,145 @@ def assure_dir(path): if not os.path.isdir(path): os.makedirs(path, exist_ok=True) -def create_auth(): +def get_onlyfans_paths(): current_dir = os.path.dirname(os.path.abspath(__file__)) - auth_file_path = os.path.join(current_dir, "Configs", "OnlyFans", "Auth.json") + config_dir = os.path.join(current_dir, "Configs", "OnlyFans") + auth_file_path = os.path.join(config_dir, "Auth.json") + auth_example_path = os.path.join(config_dir, "Auth.json.example") + config_path = os.path.join(config_dir, "Config.json") + config_example_path = os.path.join(config_dir, "Config.json.example") + return { + "current_dir": current_dir, + "config_dir": config_dir, + "auth_file": auth_file_path, + "auth_example": auth_example_path, + "config_file": config_path, + "config_example": config_example_path, + } + + +def ensure_onlyfans_config_files(): + paths = get_onlyfans_paths() + assure_dir(paths["config_dir"]) + + if not os.path.exists(paths["config_file"]) and os.path.exists(paths["config_example"]): + try: + shutil.copy(paths["config_example"], paths["config_file"]) + except Exception: + pass + + if not os.path.exists(paths["auth_file"]) and os.path.exists(paths["auth_example"]): + try: + shutil.copy(paths["auth_example"], paths["auth_file"]) + except Exception: + pass + + +def load_auth_json(): + paths = get_onlyfans_paths() + auth_file_path = paths["auth_file"] + if not os.path.exists(auth_file_path): + return {} + try: + with open(auth_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def save_auth_json(auth_update): + ensure_onlyfans_config_files() + paths = get_onlyfans_paths() + current_auth = load_auth_json() + current_auth.update(auth_update) + with open(paths["auth_file"], 'w', encoding='utf-8') as f: + json.dump(current_auth, f, indent=4) + return current_auth + + +def parse_cookie_header(cookie_str): + cookies = {} + if not cookie_str: + return cookies + + parts = cookie_str.split(';') + for part in parts: + if '=' in part: + k, v = part.strip().split('=', 1) + cookies[k.strip()] = v.strip() + return cookies + - with open(auth_file_path) as f: - ljson = json.load(f) +def has_valid_auth(): + auth = load_auth_json() + required = ["user-agent", "user-id", "x-bc", "sess"] + return all(str(auth.get(key, '')).strip() for key in required) + + +def has_saved_auth_cookies(): + auth = load_auth_json() + keys = ["sess", "st", "user-id", "x-bc", "user-agent", "fp", "cf_bm", "cfuvid"] + return any(str(auth.get(key, '')).strip() for key in keys) + + +def clear_saved_auth(): + global API_HEADER + ensure_onlyfans_config_files() + paths = get_onlyfans_paths() + auth = load_auth_json() + + for key in ["user-agent", "user-id", "x-bc", "x-hash", "x-of-rev", "sess", "st", "cf_bm", "cfuvid", "fp"]: + auth.pop(key, None) + + with open(paths["auth_file"], 'w', encoding='utf-8') as f: + json.dump(auth, f, indent=4) + + API_HEADER = {} + + try: + login_profile_dir = os.path.join(CACHE_DIR, "webview_profile") + if os.path.isdir(login_profile_dir): + shutil.rmtree(login_profile_dir) + except Exception: + pass + + try: + user_id_cache_path = os.path.join(paths["config_dir"], "user_id_cache.txt") + if os.path.exists(user_id_cache_path): + os.remove(user_id_cache_path) + except Exception: + pass + + return True + + +def refresh_api_header_from_disk(): + global API_HEADER + try: + auth = create_auth() + API_HEADER = auth if auth else {} + except Exception: + API_HEADER = {} + return bool(API_HEADER and API_HEADER.get("user-id") and API_HEADER.get("x-bc")) + + +def create_auth(): + ensure_onlyfans_config_files() + ljson = load_auth_json() + if not ljson: + return None + + user_agent = str(ljson.get("user-agent", "")).strip() + user_id = str(ljson.get("user-id", "")).strip() + x_bc = str(ljson.get("x-bc", "")).strip() + + if not user_agent or not user_id or not x_bc: + return None cookies = { "sess": ljson.get("sess"), - "auth_id": ljson.get("user-id"), + "auth_id": user_id, "st": ljson.get("st"), "lang": "en", "fp": ljson.get("fp"), @@ -928,10 +1413,10 @@ def create_auth(): return { "Accept": "application/json, text/plain, */*", - "User-Agent": ljson["user-agent"], + "User-Agent": user_agent, "Accept-Encoding": "gzip, deflate", - "user-id": ljson["user-id"], - "x-bc": ljson["x-bc"], + "user-id": user_id, + "x-bc": x_bc, "x-of-rev": ljson.get("x-of-rev", ""), "x-hash": ljson.get("x-hash", ""), "Cookie": cookie_str, @@ -939,17 +1424,44 @@ def create_auth(): } def save_config(config): - current_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(current_dir, "Configs", "OnlyFans", "Config.json") - with open(config_path, 'w') as f: + ensure_onlyfans_config_files() + config_path = get_onlyfans_paths()["config_file"] + with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4) def load_config(): - current_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(current_dir, "Configs", "OnlyFans", "Config.json") - with open(config_path) as f: - config = json.load(f) - return config + ensure_onlyfans_config_files() + config_path = get_onlyfans_paths()["config_file"] + defaults = { + "settings": { + "custom_filename_prefix": "", + "watermark_text": "", + "use_month_names": False, + "use_month_numbers": False, + "no_year_folders": True, + "disable_cover_highlights": False, + "disable_folder_highlights": False, + "disable_download_post_with_txt": True, + "download_tagged_posts": False, + "merge_tagged_media": False, + "download_labels": False, + "thread_workers_count": 2 + } + } + + if os.path.exists(config_path): + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = json.load(f) + if isinstance(config, dict): + settings = config.setdefault("settings", {}) + for key, value in defaults["settings"].items(): + settings.setdefault(key, value) + return config + except Exception: + pass + + return defaults CONFIG = load_config() CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Cache") @@ -957,25 +1469,25 @@ def load_config(): NUM_THREADS = CONFIG['settings']['thread_workers_count'] def check_and_clear_cache_if_user_id_changed(): - current_dir = os.path.dirname(os.path.abspath(__file__)) - user_id_cache_path = os.path.join(current_dir, "Configs", "OnlyFans", "user_id_cache.txt") - auth_file_path = os.path.join(current_dir, "Configs", "OnlyFans", "Auth.json") + paths = get_onlyfans_paths() + user_id_cache_path = os.path.join(paths["config_dir"], "user_id_cache.txt") + current_user_id = str(load_auth_json().get("user-id", "")).strip() - with open(auth_file_path) as f: - current_user_id = json.load(f)["user-id"] + if not current_user_id: + return if not os.path.exists(user_id_cache_path): - with open(user_id_cache_path, 'w') as f: + with open(user_id_cache_path, 'w', encoding='utf-8') as f: f.write(current_user_id) return - with open(user_id_cache_path, 'r') as f: + with open(user_id_cache_path, 'r', encoding='utf-8') as f: cached_user_id = f.read().strip() if current_user_id != cached_user_id: if os.path.exists(CACHE_DIR): shutil.rmtree(CACHE_DIR) - with open(user_id_cache_path, 'w') as f: + with open(user_id_cache_path, 'w', encoding='utf-8') as f: f.write(current_user_id) check_and_clear_cache_if_user_id_changed() @@ -1499,7 +2011,10 @@ def download_drm_video(mpd_url, output_path, output_name, post_id, cookies_overr cmd.extend(["-H", f"Cookie: {dl_cookie}"]) try: - process = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + process = subprocess.run(cmd, capture_output=True, text=True) + log_debug(f"N_m3u8DL-RE returncode: {process.returncode}") + if process.stdout: log_debug(f"N_m3u8DL-RE stdout: {process.stdout[:500]}") + if process.stderr: log_debug(f"N_m3u8DL-RE stderr: {process.stderr[:500]}") return process.returncode == 0 except Exception as e: log_debug(f"Subprocess exception: {e}") @@ -2234,19 +2749,652 @@ def count_files(posts): count += len(post["media"]) return count + +def load_pyside6_webengine(verbose=True, try_install=True): + try: + from PySide6.QtCore import QTimer, QUrl + from PySide6.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton + from PySide6.QtWebEngineCore import QWebEngineProfile, QWebEnginePage, QWebEngineUrlRequestInterceptor + from PySide6.QtWebEngineWidgets import QWebEngineView + return { + "QTimer": QTimer, + "QUrl": QUrl, + "QApplication": QApplication, + "QWidget": QWidget, + "QVBoxLayout": QVBoxLayout, + "QLabel": QLabel, + "QPushButton": QPushButton, + "QWebEngineProfile": QWebEngineProfile, + "QWebEnginePage": QWebEnginePage, + "QWebEngineUrlRequestInterceptor": QWebEngineUrlRequestInterceptor, + "QWebEngineView": QWebEngineView, + } + except Exception as first_error: + if verbose: + print(f"Embedded login runtime missing in interpreter: {sys.executable}") + print(f"PySide6 import failed: {first_error}") + + if try_install: + try: + install_cmd = [sys.executable, "-m", "pip", "install", "--user", "PySide6"] + if verbose: + print("Trying automatic install with current interpreter...") + proc = subprocess.run(install_cmd, capture_output=True, text=True) + if verbose and proc.returncode != 0: + stderr_tail = (proc.stderr or "").strip().splitlines()[-3:] + if stderr_tail: + print("Auto-install error:") + for line in stderr_tail: + print(line) + except Exception as install_error: + if verbose: + print(f"Automatic install could not run: {install_error}") + + try: + from PySide6.QtCore import QTimer, QUrl + from PySide6.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QPushButton + from PySide6.QtWebEngineCore import QWebEngineProfile, QWebEnginePage, QWebEngineUrlRequestInterceptor + from PySide6.QtWebEngineWidgets import QWebEngineView + if verbose: + print("PySide6 auto-install ok.") + return { + "QTimer": QTimer, + "QUrl": QUrl, + "QApplication": QApplication, + "QWidget": QWidget, + "QVBoxLayout": QVBoxLayout, + "QLabel": QLabel, + "QPushButton": QPushButton, + "QWebEngineProfile": QWebEngineProfile, + "QWebEnginePage": QWebEnginePage, + "QWebEngineUrlRequestInterceptor": QWebEngineUrlRequestInterceptor, + "QWebEngineView": QWebEngineView, + } + except Exception as second_error: + if verbose: + print(f"Embedded login still unavailable: {second_error}") + if system == "Linux": + print("On Debian/Kali, PySide6 modules can be split into packages.") + print("Install this if pip is not enough:") + print("sudo apt install python3-pyside6.qtcore python3-pyside6.qtwidgets python3-pyside6.qtwebenginecore python3-pyside6.qtwebenginewidgets") + return None + + +def launch_embedded_login_window(): + qt = load_pyside6_webengine(verbose=True, try_install=True) + if not qt: + return 1 + + QTimer = qt["QTimer"] + QUrl = qt["QUrl"] + QApplication = qt["QApplication"] + QWidget = qt["QWidget"] + QVBoxLayout = qt["QVBoxLayout"] + QLabel = qt["QLabel"] + QPushButton = qt["QPushButton"] + QWebEngineProfile = qt["QWebEngineProfile"] + QWebEnginePage = qt["QWebEnginePage"] + QWebEngineUrlRequestInterceptor = qt["QWebEngineUrlRequestInterceptor"] + QWebEngineView = qt["QWebEngineView"] + + os.environ.setdefault("QTWEBENGINE_CHROMIUM_FLAGS", "--disable-gpu --disable-logging --log-level=3 --disable-features=WebGPU") + os.environ.setdefault("QT_LOGGING_RULES", "*.debug=false;qt.webenginecontext.debug=false") + + def qbyte_to_text(value): + try: + return bytes(value).decode('utf-8', 'ignore') + except Exception: + try: + return value.data().decode('utf-8', 'ignore') + except Exception: + return str(value) + + class HeaderInterceptor(QWebEngineUrlRequestInterceptor): + def __init__(self): + super().__init__() + self.headers = {} + + def interceptRequest(self, info): + try: + url = info.requestUrl().toString() + if "onlyfans.com" not in url: + return + + raw_headers = {} + for key, value in info.httpHeaders().items(): + raw_headers[qbyte_to_text(key).lower()] = qbyte_to_text(value) + + if raw_headers.get("x-bc"): + self.headers.update(raw_headers) + except Exception: + pass + + class SilentWebEnginePage(QWebEnginePage): + def javaScriptConsoleMessage(self, level, message, line_number, source_id): + ignored = ( + "Permissions-Policy header", + "font-size:0;color:transparent NaN", + "Failed to create WebGPU Context Provider", + ) + try: + if any(x in str(message) for x in ignored): + return + except Exception: + pass + return + + class LoginWindow(QWidget): + def __init__(self): + super().__init__() + self.setWindowTitle("OnlySnap Login") + self.resize(1200, 900) + self.saved = False + self.user_agent = "" + self.cookies = {} + self.auth_redirect_done = False + + paths = get_onlyfans_paths() + login_profile_dir = os.path.join(paths["current_dir"], "Cache", "webview_profile") + assure_dir(login_profile_dir) + assure_dir(os.path.join(login_profile_dir, "storage")) + assure_dir(os.path.join(login_profile_dir, "cache")) + + self.interceptor = HeaderInterceptor() + self.profile = QWebEngineProfile("onlysnap_login_profile", self) + self.profile.setPersistentStoragePath(os.path.join(login_profile_dir, "storage")) + self.profile.setCachePath(os.path.join(login_profile_dir, "cache")) + self.profile.setPersistentCookiesPolicy(QWebEngineProfile.ForcePersistentCookies) + self.profile.setHttpCacheType(QWebEngineProfile.DiskHttpCache) + self.profile.setUrlRequestInterceptor(self.interceptor) + + self.page = SilentWebEnginePage(self.profile, self) + self.view = QWebEngineView(self) + self.view.setPage(self.page) + + self.status_label = QLabel("Log in to OnlyFans. Session will save automatically.") + self.save_button = QPushButton("Save Session Now") + self.close_button = QPushButton("Close") + + layout = QVBoxLayout(self) + layout.addWidget(self.status_label) + layout.addWidget(self.view) + layout.addWidget(self.save_button) + layout.addWidget(self.close_button) + + self.save_button.clicked.connect(self.try_save_session) + self.close_button.clicked.connect(self.close) + self.view.loadFinished.connect(self.on_load_finished) + self.view.urlChanged.connect(self.on_url_changed) + + cookie_store = self.profile.cookieStore() + cookie_store.cookieAdded.connect(self.on_cookie_added) + cookie_store.loadAllCookies() + + self.timer = QTimer(self) + self.timer.setInterval(1500) + self.timer.timeout.connect(self.periodic_check) + self.timer.start() + + self.view.load(QUrl("https://onlyfans.com/")) + + def on_cookie_added(self, cookie): + try: + name = qbyte_to_text(cookie.name()) + value = qbyte_to_text(cookie.value()) + if name: + self.cookies[name] = value + except Exception: + pass + + def on_load_finished(self, ok): + if ok: + try: + self.user_agent = self.profile.httpUserAgent() or self.user_agent + except Exception: + pass + try: + self.profile.cookieStore().loadAllCookies() + except Exception: + pass + self.periodic_check() + + def on_url_changed(self, url): + current = url.toString() + if "onlyfans.com" in current and self.cookies.get("auth_id") and not self.auth_redirect_done: + if "/my/" not in current: + self.auth_redirect_done = True + self.view.load(QUrl("https://onlyfans.com/my/subscriptions")) + self.periodic_check() + + def periodic_check(self): + try: + self.profile.cookieStore().loadAllCookies() + except Exception: + pass + self.try_save_session(silent=True) + + def build_auth_payload(self): + current_auth = load_auth_json() + header_cookies = parse_cookie_header(self.interceptor.headers.get("cookie", "")) + merged_cookies = {} + merged_cookies.update(self.cookies) + merged_cookies.update(header_cookies) + + payload = { + "user-agent": self.interceptor.headers.get("user-agent") or self.user_agent or current_auth.get("user-agent", ""), + "user-id": self.interceptor.headers.get("user-id") or merged_cookies.get("auth_id") or current_auth.get("user-id", ""), + "x-bc": self.interceptor.headers.get("x-bc") or current_auth.get("x-bc", ""), + "x-hash": self.interceptor.headers.get("x-hash", current_auth.get("x-hash", "")), + "x-of-rev": self.interceptor.headers.get("x-of-rev", current_auth.get("x-of-rev", "")), + "sess": merged_cookies.get("sess", current_auth.get("sess", "")), + "st": merged_cookies.get("st", current_auth.get("st", "")), + "cf_bm": merged_cookies.get("__cf_bm", current_auth.get("cf_bm", "")), + "cfuvid": merged_cookies.get("_cfuvid", current_auth.get("cfuvid", "")), + "fp": merged_cookies.get("fp", current_auth.get("fp", "")), + } + return payload + + def try_save_session(self, silent=False): + payload = self.build_auth_payload() + required = [payload.get("user-agent"), payload.get("user-id"), payload.get("x-bc"), payload.get("sess")] + + if not all(str(x).strip() for x in required): + if not silent: + self.status_label.setText("Login detected, but session headers are not complete yet. Wait a bit more.") + return False + + save_auth_json(payload) + self.saved = True + self.status_label.setText("Session saved. Closing...") + print("Embedded login success: Auth.json updated.") + QTimer.singleShot(700, self.close) + return True + + def closeEvent(self, event): + if not self.saved: + self.try_save_session(silent=True) + + try: + self.timer.stop() + except Exception: + pass + + try: + cookie_store = self.profile.cookieStore() + cookie_store.cookieAdded.disconnect(self.on_cookie_added) + except Exception: + pass + + try: + self.view.loadFinished.disconnect(self.on_load_finished) + except Exception: + pass + try: + self.view.urlChanged.disconnect(self.on_url_changed) + except Exception: + pass + + try: + self.view.setPage(None) + except Exception: + pass + try: + self.page.deleteLater() + except Exception: + pass + try: + self.view.deleteLater() + except Exception: + pass + try: + self.profile.setUrlRequestInterceptor(None) + except Exception: + pass + try: + self.profile.deleteLater() + except Exception: + pass + + return super().closeEvent(event) + + app = QApplication.instance() + owns_app = False + if app is None: + app = QApplication(sys.argv) + owns_app = True + + window = LoginWindow() + window.show() + result = app.exec() if owns_app else 0 + + if window.saved or has_valid_auth(): + return 0 + return result or 1 + + +def find_chrome_binary(): + candidates = [] + if system == "Windows": + local = os.environ.get("LOCALAPPDATA", "") + program_files = os.environ.get("PROGRAMFILES", "") + program_files_x86 = os.environ.get("PROGRAMFILES(X86)", "") + candidates.extend([ + os.path.join(program_files, "Google", "Chrome", "Application", "chrome.exe"), + os.path.join(program_files_x86, "Google", "Chrome", "Application", "chrome.exe"), + os.path.join(local, "Google", "Chrome", "Application", "chrome.exe"), + os.path.join(program_files, "Chromium", "Application", "chrome.exe"), + os.path.join(program_files_x86, "Chromium", "Application", "chrome.exe"), + ]) + else: + for name in ["google-chrome", "google-chrome-stable", "chromium", "chromium-browser", "chrome"]: + found = shutil.which(name) + if found: + return found + for c in candidates: + if c and os.path.isfile(c): + return c + return None + + +def get_current_monitor_position(): + """ + Retorna (x, y) de la esquina superior izquierda del monitor + donde está corriendo el proceso actual. Si no se puede detectar, + retorna None. + """ + try: + if system == "Windows": + import ctypes + user32 = ctypes.windll.user32 + cursor_x = ctypes.c_int() + cursor_y = ctypes.c_int() + # Obtener posición del cursor como referencia del monitor activo + ctypes.windll.user32.GetCursorPos(ctypes.byref(ctypes.wintypes.POINT())) + # Usar la ventana de consola activa + hwnd = ctypes.windll.kernel32.GetConsoleWindow() + if not hwnd: + return None + rect = ctypes.wintypes.RECT() + ctypes.windll.user32.GetWindowRect(hwnd, ctypes.byref(rect)) + win_center_x = (rect.left + rect.right) // 2 + win_center_y = (rect.top + rect.bottom) // 2 + # Buscar en qué monitor está ese punto + monitors = [] + def _monitor_enum_cb(hmon, hdc, lprect, lparam): + r = lprect.contents + monitors.append((r.left, r.top, r.right, r.bottom)) + return 1 + MonitorEnumProc = ctypes.WINFUNCTYPE( + ctypes.c_bool, + ctypes.c_ulong, ctypes.c_ulong, + ctypes.POINTER(ctypes.wintypes.RECT), + ctypes.c_double + ) + cb = MonitorEnumProc(_monitor_enum_cb) + ctypes.windll.user32.EnumDisplayMonitors(None, None, cb, 0) + for left, top, right, bottom in monitors: + if left <= win_center_x < right and top <= win_center_y < bottom: + return (left, top) + return None + + else: + # Linux: usar solo xrandr para obtener el monitor primario sin generar eventos X11 + xrandr_path = shutil.which("xrandr") + if not xrandr_path: + try: + subprocess.run( + ["sudo", "apt-get", "install", "-y", "x11-xserver-utils"], + capture_output=True, timeout=30 + ) + xrandr_path = shutil.which("xrandr") + except Exception: + pass + + if not xrandr_path: + return None + + xrandr_result = subprocess.run( + ["xrandr", "--query"], + capture_output=True, text=True, timeout=3 + ) + import re as _re + primary_position = None + first_connected_position = None + for xrandr_line in xrandr_result.stdout.splitlines(): + match = _re.search(r'(\d+)x(\d+)\+(\d+)\+(\d+)', xrandr_line) + if match and " connected" in xrandr_line: + mon_x = int(match.group(3)) + mon_y = int(match.group(4)) + if first_connected_position is None: + first_connected_position = (mon_x, mon_y) + if " primary" in xrandr_line: + primary_position = (mon_x, mon_y) + return primary_position or first_connected_position + + except Exception: + return None + + +def launch_visible_chrome_login_window(): + try: + import websocket as _websocket_module + except ImportError: + print("[*] Installing websocket-client...") + subprocess.run([sys.executable, "-m", "pip", "install", "websocket-client"], check=False) + try: + import websocket as _websocket_module + except ImportError: + print("Could not install websocket-client. Cannot launch Chrome login.") + return 1 + + import socket as _socket + import json as _json + + chrome_binary = find_chrome_binary() + if not chrome_binary: + print("Chrome/Chromium not found.") + return 1 + + paths = get_onlyfans_paths() + login_profile_dir = os.path.join(paths["current_dir"], "Cache", "chrome_login_profile") + assure_dir(login_profile_dir) + + # Find a free port for CDP + with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _sock: + _sock.bind(("127.0.0.1", 0)) + cdp_port = _sock.getsockname()[1] + + chrome_cmd = [ + chrome_binary, + f"--remote-debugging-port={cdp_port}", + "--remote-allow-origins=*", + f"--user-data-dir={login_profile_dir}", + "--no-first-run", + "--no-default-browser-check", + "--new-window", + ] + + monitor_position = get_current_monitor_position() + if monitor_position: + chrome_cmd.append(f"--window-position={monitor_position[0]},{monitor_position[0]}") + + chrome_cmd.append("https://onlyfans.com/") + + try: + chrome_proc = subprocess.Popen( + chrome_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except Exception as e: + print(f"Could not launch Chrome: {e}") + return 1 + + print(f"Chrome login opened with isolated profile: {login_profile_dir}") + print("Log in to OnlyFans. Session will save automatically.") + + # Wait for CDP to be ready + cdp_ws_url = None + for _attempt in range(30): + time.sleep(0.5) + try: + cdp_response = requests.get(f"http://127.0.0.1:{cdp_port}/json", timeout=2) + cdp_pages = cdp_response.json() + for cdp_page in cdp_pages: + if cdp_page.get("type") == "page": + cdp_ws_url = cdp_page["webSocketDebuggerUrl"] + break + if cdp_ws_url: + break + except Exception: + pass + + if not cdp_ws_url: + print("Could not connect to Chrome DevTools Protocol.") + chrome_proc.terminate() + return 1 + + captured_of_headers = {} + captured_of_cookies = {} + session_saved_event = threading.Event() + cdp_msg_id_counter = [0] + cdp_lock = threading.Lock() + + def get_next_cdp_id(): + with cdp_lock: + cdp_msg_id_counter[0] += 1 + return cdp_msg_id_counter[0] + + def try_build_and_save_auth(): + current_auth = load_auth_json() + header_cookies = parse_cookie_header(captured_of_headers.get("cookie", "")) + merged_cookies = {} + merged_cookies.update(captured_of_cookies) + merged_cookies.update(header_cookies) + + auth_payload = { + "user-agent": captured_of_headers.get("user-agent") or current_auth.get("user-agent", ""), + "user-id": captured_of_headers.get("user-id") or merged_cookies.get("auth_id") or current_auth.get("user-id", ""), + "x-bc": captured_of_headers.get("x-bc") or current_auth.get("x-bc", ""), + "x-hash": captured_of_headers.get("x-hash", current_auth.get("x-hash", "")), + "x-of-rev": captured_of_headers.get("x-of-rev", "202602012155-7f8fb7678a"), + "sess": merged_cookies.get("sess", current_auth.get("sess", "")), + "st": merged_cookies.get("st", current_auth.get("st", "")), + "cf_bm": merged_cookies.get("__cf_bm", current_auth.get("cf_bm", "")), + "cfuvid": merged_cookies.get("_cfuvid", current_auth.get("cfuvid", "")), + "fp": merged_cookies.get("fp", current_auth.get("fp", "")), + } + + required_fields = [ + auth_payload.get("user-agent"), + auth_payload.get("user-id"), + auth_payload.get("x-bc"), + auth_payload.get("sess"), + ] + if not all(str(field).strip() for field in required_fields if field): + return False + if not all(required_fields): + return False + + save_auth_json(auth_payload) + print("Chrome login success: Auth.json updated.") + session_saved_event.set() + return True + + def on_cdp_message(cdp_ws, raw_message): + try: + cdp_data = _json.loads(raw_message) + except Exception: + return + + cdp_method = cdp_data.get("method", "") + + if cdp_method == "Network.requestWillBeSent": + request_params = cdp_data.get("params", {}) + request_info = request_params.get("request", {}) + request_url = request_info.get("url", "") + if "onlyfans.com" not in request_url: + return + request_headers = {k.lower(): v for k, v in request_info.get("headers", {}).items()} + if request_headers.get("x-bc"): + captured_of_headers.update(request_headers) + cdp_ws.send(_json.dumps({ + "id": get_next_cdp_id(), + "method": "Network.getCookies", + "params": {"urls": ["https://onlyfans.com"]} + })) + + cdp_result = cdp_data.get("result", {}) + cookies_from_cdp = cdp_result.get("cookies", []) + if cookies_from_cdp: + for cookie_entry in cookies_from_cdp: + captured_of_cookies[cookie_entry["name"]] = cookie_entry["value"] + if not session_saved_event.is_set(): + try_build_and_save_auth() + + def on_cdp_open(cdp_ws): + cdp_ws.send(_json.dumps({ + "id": get_next_cdp_id(), + "method": "Network.enable", + "params": {} + })) + + cdp_ws_app = _websocket_module.WebSocketApp( + cdp_ws_url, + on_message=on_cdp_message, + on_open=on_cdp_open, + on_error=lambda ws, err: None, + on_close=lambda ws, code, msg: None, + ) + + cdp_thread = threading.Thread(target=cdp_ws_app.run_forever, daemon=True) + cdp_thread.start() + + while not session_saved_event.is_set(): + if chrome_proc.poll() is not None: + break + time.sleep(1) + + if session_saved_event.is_set(): + time.sleep(0.7) + try: + chrome_proc.terminate() + except Exception: + pass + + try: + cdp_ws_app.close() + except Exception: + pass + + if session_saved_event.is_set() or has_valid_auth(): + return 0 + return 1 + + if __name__ == "__main__": + parser = argparse.ArgumentParser(add_help=False) + parser.add_argument("--login-chrome", action="store_true") + parser.add_argument("--login-webview", action="store_true") + args, _ = parser.parse_known_args() + + if args.login_chrome or args.login_webview: + sys.exit(launch_visible_chrome_login_window()) + try: + init() + ensure_onlyfans_config_files() + print(f"{Fore.YELLOW}[*] Loading Auth...{Style.RESET_ALL}") - API_HEADER = create_auth() - - if not API_HEADER or "user-id" not in API_HEADER: - print(f"{Fore.RED}ERROR: Invalid Auth.json.{Style.RESET_ALL}") - sys.exit(1) - + API_HEADER = create_auth() or {} + + if not API_HEADER or "user-id" not in API_HEADER or "x-bc" not in API_HEADER: + print(f"{Fore.YELLOW}[*] No valid Auth.json yet. Use the Login button inside OnlySnap.{Style.RESET_ALL}") + try: print(f"{Fore.YELLOW}[*] Loading dynamic rules...{Style.RESET_ALL}") - dynamic_rules = requests.get('https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json').json() - except: + dynamic_rules = requests.get('https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json', timeout=10).json() + except Exception: print("Warning: Could not download dynamic rules (offline?)") dynamic_rules = {} @@ -2256,6 +3404,5 @@ def count_files(posts): print(f"{Fore.RED}STARTUP ERROR: {e}{Style.RESET_ALL}") sys.exit(1) - init() app = OnlySnapTUI() app.run() diff --git a/README.md b/README.md index 59620ab..9df9030 100644 --- a/README.md +++ b/README.md @@ -1,83 +1,119 @@ -**Yoooo** - -This project was **COOKED**. Deadass forgotten for 3 years. But I locked in, cracked a Monster, and rewrote the whole thing. +# OnlySnap -We ain't using basic scripts anymore. We got a **TUI** (Terminal User Interface). +> Fork of [jordon31/OnlySnap](https://github.com/jordon31/OnlySnap) with automated login and dependency management. -**Check this out:** No more sweating over `cmd` commands or typing manual inputs. -You can letterally **CLICK** on stuff now. +A **TUI** (Terminal User Interface) tool for OnlyFans content downloading. No command-line knowledge needed, just click and download. -It's stupid easy. Other tools out there require a PhD in coding to run; mine is built different. **EZ.** No complex garbage, just click and download. +## What's changed from the original -## HOW TO START (video for fast) +- **Automated login:** Replaced manual cookie extraction (F12, DevTools, copy-paste headers) with an automated Chrome login window. +- **Isolated Chrome instance:** The login opens a separate Chrome profile that doesn't touch the user's personal browser data. +- **Automatic dependency detection:** The app checks for missing tools (N_m3u8DL-RE, mp4decrypt, ffmpeg) on startup and offers to download and install them automatically. +- **Removed `cookie-onlyfans.py`:** No longer needed since login is handled automatically. -`.bat` for fast +--- + +## Requirements + +- **Python 3.10+** +- **Google Chrome** installed (required for the automated login) +- **Pillow** (included in `requirements.txt`, required for watermarks) +- Install dependencies: `pip install -r requirements.txt` + +### Platform support + +| Platform | Status | +| --- | --- | +| **Linux** | ✅ Tested | +| **Windows** | ⚠️ Should work, not fully tested yet | +| **macOS** | ⚠️ Should work, not fully tested yet | + +--- -1.  **Python 3.10+** required. -2.  Install requirements: `pip install -r requirements.txt` (Now includes **Pillow** for the Watermark magic). -3.  Click `!run.bat`. -4.  Video: [tutorial](https://youtu.be/5tMwjg5hXNY) +## HOW TO START + +1. Run `!run.bat` (Windows) or `python OnlySnap.py` (Linux/Mac) ### THE LAUNCHER -Inside `!run.bat` you have 2 options. Don't mess this up. +Inside `!run.bat`: -* **[1] START ONLYSNAP:** -    * Opens the main app. -* **[2] AUTO-PASTE (DO THIS FIRST!):** -    * Go to Browser -> F12 -> Network -> Click on "Fetch/XHR" -> chat api for no miss nothing. [chats](https://onlyfans.com/my/chats) -    * Copy the headers/request. - -    *  ![Screenshot Cookies](https://i.postimg.cc/TwS30f62/Screenshot-2026-02-09-021826.png) - -    * Run Option 2. The script **yeets** the cookies straight into the config. No manual copy-paste struggle. +* **[1] START ONLYSNAP** — Opens the main app. +* **[2] INSTALL DRM TOOLS** — Downloads FFmpeg, mp4decrypt and N_m3u8DL-RE into the `dmr` folder. --- -## ⚙️ SETTINGS (STOP EDITING JSON FILES) +## 🔐 AUTOMATED LOGIN + +No more manual cookie extraction. Here's how it works: + +1. Click the **Login** button inside the app. +2. An isolated Chrome window opens (separate from your personal browser). +3. Log in to OnlyFans normally from that window. +4. Once logged in, the app captures the cookies automatically in the background. +5. The Chrome window closes and the session is saved. Ready to use. + +If a saved session already exists, the app asks if you want to replace it with a new one. + +🎥 **Login Demo:** + + -You don't need to touch `Config.json` like a caveman anymore.  -Inside the app, there is a **[SETTINGS]** button. Click it. -**BIG NEWS:** Everything you change is **AUTO-SAVED IN REAL-TIME**. No "Save" button, no "I forgot to click apply". You type, it saves. Period. -| Setting | Translation for Dummies | +https://github.com/user-attachments/assets/7f336ee4-343f-4278-a657-f6703e664abb + + + + + +## 🔧 AUTOMATIC DEPENDENCY INSTALLATION + +On startup, the app checks if the following tools are present: + +| Tool | What it does | | --- | --- | -| **Custom Filename** | Add your branding (es. `@MyChannel`). Leave empty for original IDs. | -| **Watermark Text** | Type your text. It adds a sleek, dynamic watermark on every photo. | -| **Month Names** | `true` = "January", `false` = Numbers. Aesthetic choice. | -| **No Year Folders** | If `true`, it dumps everything in one place. Chaotic evil. | -| **Skip Highlights Covers** | Saves space. Who looks at covers anyway? | -| **Disable Text Files** | `true` = Only Media. `false` = Includes a `.txt` with the post caption. | -| **Download Tagged** | Downloads SPAM/ADS (#ad). Keep it `false` unless you love commercials. | -| **Workers (Threads)** | Speed. Default is 5. High values = Fast, but don't fry your CPU. | +| **N_m3u8DL-RE** | Downloads encrypted video streams | +| **mp4decrypt** | Decrypts DRM-protected media | +| **ffmpeg** | Media processing and conversion | + +If any are missing, a screen appears offering to download and install them automatically into the `dmr` folder. Downloads are pulled directly from the official sources for each platform. --- -## ⚠️ ATTENTION: FILENAME LOGIC +## ⚙️ SETTINGS -Read this or don't complain later. -The script checks if a file exists by its **Name**. +Settings are managed directly from the app using the **[SETTINGS]** button. No need to manually edit `Config.json`. All changes are **auto-saved in real-time**. + +| Setting | Description | +| --- | --- | +| **Custom Filename** | Add a prefix to filenames. Leave empty for original IDs. | +| **Watermark Text** | Adds a dynamic watermark on every photo. | +| **Month Names** | `true` = month names, `false` = numbers. | +| **No Year Folders** | `true` = all files in one folder. | +| **Skip Highlights Covers** | Skips highlight cover images. | +| **Disable Text Files** | `true` = only media, `false` = includes `.txt` with post caption. | +| **Download Tagged** | Downloads SPAM/AD posts. Default `false`. | +| **Workers (Threads)** | Download speed. Default is 5. | -* If you set a **Custom Filename Prefix** (e.g., `MyStore_12345.jpg`), the script saves it like that. -* If you later **DELETE** the prefix or change it, the script will look for `12345.jpg`, won't find it, and **WILL DOWNLOAD EVERYTHING AGAIN**. --- -## 📸 SMART WATERMARK +## ⚠️ FILENAME LOGIC + +The script checks if a file exists by its **name**. If a **Custom Filename Prefix** is set and later removed, files will be re-downloaded because the expected filename changes. -We added a high-end **Auto-Marker** for photos. +--- + +## 📸 SMART WATERMARK -* **Dynamic Sizing:** It detects the photo resolution and adapts the text size so it's never too big or too small. -* **Elegant Design:** White text, subtle shadow, and a semi-transparent dark background in the bottom-left corner. -* **How to use:** Just put your text in Settings. If you want a specific vibe, drop your favorite `.ttf` font into the main folder. +Automatic watermark system for photos with dynamic sizing based on resolution. White text with subtle shadow on a semi-transparent background. Custom `.ttf` fonts can be placed in the main folder. --- ## 🔧 EXTRAS * **Telegram:** [https://t.me/OnlySnap0](https://t.me/OnlySnap0) -* **Credits:** Me. I built this while you were sleeping. -* **Bugs:** It works on my machine. (any problem / suggestion open issues) -* **Disclaimer:** For educational purposes only (wink wink). +* **Bugs / Suggestions:** Open an issue. +* **Disclaimer:** For educational purposes only. OnlyFans Scrape - Scrape OnlyFans diff --git a/cookie-onlyfans.py b/cookie-onlyfans.py deleted file mode 100644 index 0c00c4c..0000000 --- a/cookie-onlyfans.py +++ /dev/null @@ -1,162 +0,0 @@ -import json -import os -import sys -import time -import shutil -import tkinter as tk - -def get_clipboard_content(): - try: - root = tk.Tk() - root.withdraw() - content = root.clipboard_get() - root.destroy() - return content - except: - return "" - -def parse_smart(raw_text): - data = {} - lines = raw_text.splitlines() - - target_keys = [ - "user-agent", "x-hash", "x-bc", "user-id", "cookie", - "x-of-rev", "sign", "accept" - ] - - i = 0 - while i < len(lines): - line = lines[i].strip() - if not line: - i += 1 - continue - - if ":" in line: - parts = line.split(":", 1) - k = parts[0].strip().lower() - v = parts[1].strip() - - if k.startswith(":"): k = k[1:] - - if k in target_keys or k == "cookie": - if v: - data[k] = v - - key_lower = line.lower() - if key_lower in target_keys: - if i + 1 < len(lines): - next_line = lines[i+1].strip() - if next_line: - data[key_lower] = next_line - i += 1 - - if "sess=" in line and "cookie" not in data: - if not line.lower().startswith("cookie:") and not line.lower().startswith("set-cookie:"): - data["cookie"] = line - - i += 1 - - return data - -def parse_cookies(cookie_str): - cookies = {} - if not cookie_str: return cookies - parts = cookie_str.split(';') - for part in parts: - if '=' in part: - k, v = part.strip().split('=', 1) - cookies[k.strip()] = v.strip() - return cookies - -def main(): - os.system('cls' if os.name == 'nt' else 'clear') - print("OnlyFans Cookies") - print("---------------------------------------") - - print("Checking clipboard...") - raw_text = get_clipboard_content() - - extracted = parse_smart(raw_text) - - has_headers = False - if extracted.get("user-agent") or extracted.get("cookie") or "sess=" in raw_text: - has_headers = True - - if not has_headers: - print("\nERROR: No Headers found!") - print("Please copy the 'onlyfans.com/api2/v2/lists?filter=chat' section from DevTools.") - time.sleep(3) - return - - cookies = parse_cookies(extracted.get("cookie", "")) - - if not extracted.get("user-agent") and not cookies.get("sess"): - print("\nData looks incomplete (Missing UA or Session). Aborting.") - time.sleep(2) - return - - print("Data looks valid. Processing...") - - base_path = os.path.dirname(os.path.abspath(__file__)) - config_dir = os.path.join(base_path, "Configs", "OnlyFans") - - if not os.path.exists(config_dir): - os.makedirs(config_dir) - - config_file = os.path.join(config_dir, "Config.json") - config_example = os.path.join(config_dir, "Config.json.example") - - if not os.path.exists(config_file) and os.path.exists(config_example): - try: - shutil.copy(config_example, config_file) - except: pass - - auth_file = os.path.join(config_dir, "Auth.json") - auth_example = os.path.join(config_dir, "Auth.json.example") - - current_auth = {} - - if os.path.exists(auth_file): - try: - with open(auth_file, "r", encoding="utf-8") as f: - current_auth = json.load(f) - except: - current_auth = {} - - elif os.path.exists(auth_example): - print("Creating new Auth.json from example...") - try: - shutil.copy(auth_example, auth_file) - with open(auth_file, "r", encoding="utf-8") as f: - current_auth = json.load(f) - except: pass - else: - print("⚠️ Warning: Neither Auth.json nor Auth.json.example found. Creating from scratch.") - - auth_update = { - "user-agent": extracted.get("user-agent", current_auth.get("user-agent", "")), - "user-id": extracted.get("user-id", cookies.get("auth_id", current_auth.get("user-id", ""))), - "x-bc": extracted.get("x-bc", cookies.get("fp", current_auth.get("x-bc", ""))), - "x-hash": extracted.get("x-hash", current_auth.get("x-hash", "")), - "x-of-rev": extracted.get("x-of-rev", "202602012155-7f8fb7678a"), - "sess": cookies.get("sess", current_auth.get("sess", "")), - "st": cookies.get("st", current_auth.get("st", "")), - "cf_bm": cookies.get("__cf_bm", current_auth.get("cf_bm", "")), - "cfuvid": cookies.get("_cfuvid", current_auth.get("cfuvid", "")), - "fp": cookies.get("fp", current_auth.get("fp", "")), - } - - current_auth.update(auth_update) - - try: - with open(auth_file, "w", encoding="utf-8") as f: - json.dump(current_auth, f, indent=4) - print(f"\n✅ Success! Updated: {auth_file}") - except Exception as e: - print(f"Error saving file: {e}") - - print("Bye.") - time.sleep(1.5) - -if __name__ == "__main__": - main() \ No newline at end of file