#!/usr/bin/env python3 """ Telegram File Manager Bot for Android (Pydroid 3) Production-grade, high-performance file management bot with admin-only control. Single-file implementation with persistent storage and RAM caching. """ import os import sys import json import time import hashlib import threading import queue from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from pathlib import Path from collections import defaultdict, deque from typing import Optional, Dict, List, Any, Callable import logging import re # Third-party imports try: import telebot from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile from telebot.apihelper import ApiTelegramException except ImportError: print("Installing pyTelegramBotAPI...") os.system(f"{sys.executable} -m pip install pyTelegramBotAPI") import telebot from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile from telebot.apihelper import ApiTelegramException # ============================================================================= # CONFIGURATION # ============================================================================= DATA_DIR = Path(__file__).parent / "bot_data" DATA_DIR.mkdir(exist_ok=True) PERSISTENCE_FILE = DATA_DIR / "bot_state.json" LOG_FILE = DATA_DIR / "bot.log" # Thread pool sizes (tuned for mobile) UI_WORKERS = 4 SCAN_WORKERS = 2 UPLOAD_WORKERS = 3 SCHEDULER_WORKERS = 2 # Pagination FOLDERS_PER_PAGE = 10 FILES_PER_PAGE = 15 # Retry configuration MAX_RETRIES = 4 RETRY_DELAYS = [1, 3, 7, 15] # Exponential backoff # File type definitions FILE_TYPES = { 'images': {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.ico', '.tiff', '.tif'}, 'videos': {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.3gp', '.ts'}, 'audios': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma', '.opus', '.aiff'}, 'documents': {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf'}, 'archives': {'.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz'}, 'code': {'.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.h', '.php', '.rb', '.go', '.rs', '.swift'} } # ============================================================================= # LOGGING SETUP # ============================================================================= logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger('FileManagerBot') # ============================================================================= # STATE MANAGEMENT # ============================================================================= class BotState: """Thread-safe persistent state management with RAM caching.""" _instance = None _lock = threading.RLock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self._save_lock = threading.RLock() self._cache_lock = threading.RLock() # Persistent data (auto-saved) self.admin_id: Optional[int] = None self.admin_settings: Dict[str, Any] = {} self.channels: List[Dict[str, Any]] = [] # {id, title, type, last_used} self.selected_targets: List[int] = [] self.thread_count: int = 3 self.active_filters: List[str] = [] # 'images', 'videos', etc. or custom extensions self.max_files: Optional[int] = None self.duplicate_prevention: bool = False self.duplicate_method: str = 'hash' # 'hash' or 'filename_size' self.welcome_template: str = "Welcome! You are admin. Channels: {ADMIN_CHANNELS_COUNT}" self.photo_mode: str = 'welcome' # 'welcome', 'all', 'disabled' self.bot_photo_url: Optional[str] = None self.scheduled_jobs: List[Dict[str, Any]] = [] self.operation_logs: List[Dict[str, Any]] = [] self.android_help_text: str = self._default_help_text() # RAM cache (not persisted) self.cache: Dict[str, Any] = { 'scan_results': {}, # path -> {files, folders, timestamp} 'file_lists': {}, # path -> [file_info] 'pagination': {}, # chat_id -> {page, path, items_per_page} 'nav_stack': {}, # chat_id -> [path_history] 'upload_progress': {}, # chat_id -> upload_state 'message_ids': {}, # chat_id -> message_id 'pending_selections': {}, # chat_id -> {files, selected_indices} } # Runtime state self.upload_paused: Dict[int, bool] = {} self.upload_cancelled: Dict[int, bool] = {} self.active_uploads: Dict[int, threading.Event] = {} self.network_online: bool = True self._load() def _default_help_text(self) -> str: return """πŸ“± **Android File Access Guide** **For Android 12+ (API 31+):** 1. **Grant Permissions:** - Go to Settings β†’ Apps β†’ Pydroid 3 β†’ Permissions - Enable "Files and media" β†’ "Allow all the time" - Or "All files access" if available 2. **Common Paths:** - `/storage/emulated/0/` - Internal storage - `/storage/emulated/0/Download/` - Downloads - `/storage/emulated/0/Documents/` - Documents - `/sdcard/` - Symlink to internal storage 3. **Using Pydroid 3:** - Use the file browser in Pydroid to find your path - Or run: `os.listdir('/storage/emulated/0/')` in Python console 4. **Troubleshooting:** - If permission denied: Restart Pydroid 3 after granting permissions - Try: `os.system('termux-setup-storage')` if using Termux **Video Tutorial:** https://youtu.be/example (placeholder)""" def _load(self): """Load state from persistent storage.""" if PERSISTENCE_FILE.exists(): try: with open(PERSISTENCE_FILE, 'r', encoding='utf-8') as f: data = json.load(f) self.admin_id = data.get('admin_id') self.admin_settings = data.get('admin_settings', {}) self.channels = data.get('channels', []) self.selected_targets = data.get('selected_targets', []) self.thread_count = data.get('thread_count', 3) self.active_filters = data.get('active_filters', []) self.max_files = data.get('max_files') self.duplicate_prevention = data.get('duplicate_prevention', False) self.duplicate_method = data.get('duplicate_method', 'hash') self.welcome_template = data.get('welcome_template', self.welcome_template) self.photo_mode = data.get('photo_mode', 'welcome') self.bot_photo_url = data.get('bot_photo_url') self.scheduled_jobs = data.get('scheduled_jobs', []) self.operation_logs = data.get('operation_logs', []) self.android_help_text = data.get('android_help_text', self.android_help_text) logger.info(f"Loaded state. Admin: {self.admin_id}") except Exception as e: logger.error(f"Failed to load state: {e}") def save(self): """Save state to persistent storage (thread-safe).""" with self._save_lock: data = { 'admin_id': self.admin_id, 'admin_settings': self.admin_settings, 'channels': self.channels, 'selected_targets': self.selected_targets, 'thread_count': self.thread_count, 'active_filters': self.active_filters, 'max_files': self.max_files, 'duplicate_prevention': self.duplicate_prevention, 'duplicate_method': self.duplicate_method, 'welcome_template': self.welcome_template, 'photo_mode': self.photo_mode, 'bot_photo_url': self.bot_photo_url, 'scheduled_jobs': self.scheduled_jobs, 'operation_logs': self.operation_logs[-1000:], # Keep last 1000 'android_help_text': self.android_help_text, } try: # Atomic write temp_file = PERSISTENCE_FILE.with_suffix('.tmp') with open(temp_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) temp_file.replace(PERSISTENCE_FILE) except Exception as e: logger.error(f"Failed to save state: {e}") # Cache operations def cache_get(self, key: str, default=None): with self._cache_lock: return self.cache.get(key, default) def cache_set(self, key: str, value: Any): with self._cache_lock: self.cache[key] = value def cache_delete(self, key: str): with self._cache_lock: self.cache.pop(key, None) def get_chat_cache(self, chat_id: int, key: str, default=None): with self._cache_lock: return self.cache.get(key, {}).get(chat_id, default) def set_chat_cache(self, chat_id: int, key: str, value: Any): with self._cache_lock: if key not in self.cache: self.cache[key] = {} self.cache[key][chat_id] = value # Convenience methods def is_admin(self, user_id: int) -> bool: return self.admin_id == user_id def add_channel(self, channel_id: int, title: str, chat_type: str): # Remove if exists self.channels = [c for c in self.channels if c['id'] != channel_id] # Add to front self.channels.insert(0, { 'id': channel_id, 'title': title, 'type': chat_type, 'last_used': datetime.now().isoformat() }) self.save() def update_channel_order(self, channel_id: int): for ch in self.channels: if ch['id'] == channel_id: ch['last_used'] = datetime.now().isoformat() break self.channels.sort(key=lambda x: x['last_used'], reverse=True) self.save() def log_operation(self, operation_type: str, details: Dict): self.operation_logs.append({ 'type': operation_type, 'timestamp': datetime.now().isoformat(), 'details': details }) self.save() # Global state instance state = BotState() # ============================================================================= # THREAD POOLS AND QUEUES # ============================================================================= class ThreadManager: """Manages all thread pools with proper prioritization.""" def __init__(self): self.ui_executor = ThreadPoolExecutor(max_workers=UI_WORKERS, thread_name_prefix='ui') self.scan_executor = ThreadPoolExecutor(max_workers=SCAN_WORKERS, thread_name_prefix='scan') self.upload_executor = ThreadPoolExecutor(max_workers=UPLOAD_WORKERS, thread_name_prefix='upload') self.scheduler_executor = ThreadPoolExecutor(max_workers=SCHEDULER_WORKERS, thread_name_prefix='scheduler') self.ui_queue = queue.PriorityQueue() self.scan_queue = queue.Queue() self.upload_queue = queue.Queue() self._shutdown_event = threading.Event() self._stats_lock = threading.Lock() self._stats = { 'ui_tasks': 0, 'scan_tasks': 0, 'upload_tasks': 0, 'active_threads': 0 } self._start_workers() def _start_workers(self): """Start background worker threads.""" # UI priority worker threading.Thread(target=self._ui_worker, daemon=True, name='ui-priority').start() # Scan worker threading.Thread(target=self._scan_worker, daemon=True, name='scan-bg').start() def _ui_worker(self): """Process high-priority UI tasks.""" while not self._shutdown_event.is_set(): try: priority, task, callback = self.ui_queue.get(timeout=1) with self._stats_lock: self._stats['ui_tasks'] += 1 try: result = task() if callback: callback(result) except Exception as e: logger.error(f"UI task error: {e}") finally: with self._stats_lock: self._stats['ui_tasks'] -= 1 except queue.Empty: continue def _scan_worker(self): """Process background scan tasks.""" while not self._shutdown_event.is_set(): try: task, callback = self.scan_queue.get(timeout=1) with self._stats_lock: self._stats['scan_tasks'] += 1 try: result = task() if callback: callback(result) except Exception as e: logger.error(f"Scan task error: {e}") finally: with self._stats_lock: self._stats['scan_tasks'] -= 1 except queue.Empty: continue def submit_ui(self, task: Callable, callback: Optional[Callable] = None, priority: int = 5): """Submit high-priority UI task.""" self.ui_queue.put((priority, task, callback)) def submit_scan(self, task: Callable, callback: Optional[Callable] = None): """Submit background scan task.""" self.scan_queue.put((task, callback)) def submit_upload(self, task: Callable) -> Any: """Submit upload task to executor.""" with self._stats_lock: self._stats['upload_tasks'] += 1 future = self.upload_executor.submit(self._wrap_upload, task) return future def _wrap_upload(self, task: Callable): """Wrap upload task with stats tracking.""" try: return task() finally: with self._stats_lock: self._stats['upload_tasks'] -= 1 def get_stats(self) -> Dict[str, int]: with self._stats_lock: stats = dict(self._stats) stats['active_threads'] = threading.active_count() return stats def shutdown(self): """Graceful shutdown of all executors.""" self._shutdown_event.set() self.ui_executor.shutdown(wait=True) self.scan_executor.shutdown(wait=True) self.upload_executor.shutdown(wait=True) self.scheduler_executor.shutdown(wait=True) thread_manager = ThreadManager() # ============================================================================= # FILESYSTEM OPERATIONS # ============================================================================= class FileScanner: """High-performance file scanning with caching.""" CACHE_TTL = 30 # seconds @staticmethod def get_android_paths() -> List[str]: """Get common Android storage paths.""" paths = [] candidates = [ '/storage/emulated/0/', '/sdcard/', '/storage/self/primary/', os.path.expanduser('~/storage/shared/'), # Termux ] for p in candidates: if os.path.isdir(p) and os.access(p, os.R_OK): paths.append(p) return paths or ['/'] @classmethod def scan_directory(cls, path: str, force_refresh: bool = False) -> Dict[str, Any]: """Scan directory with caching.""" cache_key = f"scan:{path}" cached = state.cache_get('scan_results', {}).get(cache_key) if not force_refresh and cached: age = time.time() - cached.get('timestamp', 0) if age < cls.CACHE_TTL: return cached['data'] result = {'folders': [], 'files': [], 'error': None} try: entries = os.scandir(path) for entry in entries: try: stat = entry.stat(follow_symlinks=False) info = { 'name': entry.name, 'path': entry.path, 'size': stat.st_size, 'mtime': stat.st_mtime, } if entry.is_dir(follow_symlinks=False): info['type'] = 'folder' result['folders'].append(info) else: info['type'] = 'file' info['ext'] = os.path.splitext(entry.name)[1].lower() result['files'].append(info) except (OSError, PermissionError): continue # Sort result['folders'].sort(key=lambda x: x['name'].lower()) result['files'].sort(key=lambda x: x['name'].lower()) except PermissionError as e: result['error'] = f"Permission denied: {e}" except Exception as e: result['error'] = str(e) # Update cache scan_cache = state.cache_get('scan_results', {}) scan_cache[cache_key] = { 'data': result, 'timestamp': time.time() } state.cache_set('scan_results', scan_cache) return result @classmethod def get_filtered_files(cls, path: str, filters: List[str], max_files: Optional[int] = None) -> List[Dict]: """Get files matching filters with background pre-caching.""" cache_key = f"files:{path}:{sorted(filters)}:{max_files}" # Check cache file_cache = state.cache_get('file_lists', {}) if cache_key in file_cache: return file_cache[cache_key] # Build extension set from filters extensions = set() for f in filters: f_lower = f.lower() if f_lower in FILE_TYPES: extensions.update(FILE_TYPES[f_lower]) else: # Custom extension ext = f_lower if f_lower.startswith('.') else f'.{f_lower}' extensions.add(ext) # Scan all subdirectories all_files = [] scan_result = cls.scan_directory(path) def collect_files(current_path: str, depth: int = 0): if max_files and len(all_files) >= max_files: return res = cls.scan_directory(current_path) for f in res['files']: if max_files and len(all_files) >= max_files: break if not extensions or f['ext'] in extensions: all_files.append(f) # Queue background scans for subfolders for folder in res['folders']: if max_files and len(all_files) >= max_files: break if depth < 3: # Limit recursion depth for UI responsiveness collect_files(folder['path'], depth + 1) collect_files(path) # Cache result file_cache[cache_key] = all_files state.cache_set('file_lists', file_cache) return all_files @staticmethod def compute_hash(filepath: str, algorithm: str = 'blake2b', chunk_size: int = 8192) -> str: """Compute streaming hash for large files.""" hash_obj = hashlib.new(algorithm, digest_size=16) try: with open(filepath, 'rb') as f: while chunk := f.read(chunk_size): hash_obj.update(chunk) return hash_obj.hexdigest() except Exception: return '' @staticmethod def get_file_fingerprint(filepath: str, method: str = 'hash') -> str: """Get unique fingerprint for duplicate detection.""" if method == 'hash': return FileScanner.compute_hash(filepath) else: try: stat = os.stat(filepath) return f"{os.path.basename(filepath)}:{stat.st_size}" except: return '' # ============================================================================= # TELEGRAM BOT UI # ============================================================================= class UIManager: """Manages single-message UI updates with proper state tracking.""" def __init__(self, bot: telebot.TeleBot): self.bot = bot self._edit_locks: Dict[int, threading.RLock] = {} self._last_text: Dict[int, str] = {} def _get_lock(self, chat_id: int) -> threading.RLock: if chat_id not in self._edit_locks: self._edit_locks[chat_id] = threading.RLock() return self._edit_locks[chat_id] def ensure_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> int: """Ensure a message exists, create if needed.""" with self._get_lock(chat_id): msg_id = state.get_chat_cache(chat_id, 'message_ids') if msg_id: try: self.bot.edit_message_text( text, chat_id, msg_id, reply_markup=markup, parse_mode='Markdown', disable_web_page_preview=True ) return msg_id except ApiTelegramException as e: if 'message is not modified' in str(e).lower(): return msg_id # Message deleted or other error, create new pass # Create new message try: msg = self.bot.send_message( chat_id, text, reply_markup=markup, parse_mode='Markdown', disable_web_page_preview=True ) state.set_chat_cache(chat_id, 'message_ids', msg.message_id) return msg.message_id except Exception as e: logger.error(f"Failed to send message: {e}") return 0 def update_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> bool: """Update existing message, avoiding duplicates.""" with self._get_lock(chat_id): # Avoid identical updates cache_key = f"{chat_id}:last_text" if self._last_text.get(cache_key) == text and not markup: return True msg_id = state.get_chat_cache(chat_id, 'message_ids') if not msg_id: self.ensure_message(chat_id, text, markup) return True try: self.bot.edit_message_text( text, chat_id, msg_id, reply_markup=markup, parse_mode='Markdown', disable_web_page_preview=True ) self._last_text[cache_key] = text return True except ApiTelegramException as e: error = str(e).lower() if 'message is not modified' in error: return True if 'message to edit not found' in error: # Recreate state.set_chat_cache(chat_id, 'message_ids', None) return self.ensure_message(chat_id, text, markup) > 0 logger.warning(f"Edit failed: {e}") return False except Exception as e: logger.error(f"Update error: {e}") return False def delete_message(self, chat_id: int): """Delete tracked message.""" with self._get_lock(chat_id): msg_id = state.get_chat_cache(chat_id, 'message_ids') if msg_id: try: self.bot.delete_message(chat_id, msg_id) except: pass state.set_chat_cache(chat_id, 'message_ids', None) # ============================================================================= # KEYBOARD BUILDERS # ============================================================================= class Keyboards: """Build all inline keyboards.""" @staticmethod def main_menu() -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=2) kb.add( InlineKeyboardButton("πŸ“ Choose Folder", callback_data='menu:folder'), InlineKeyboardButton("πŸ“’ Channels", callback_data='menu:channels') ) kb.add( InlineKeyboardButton("πŸš€ Start Upload", callback_data='upload:start'), InlineKeyboardButton("⏰ Schedule", callback_data='menu:schedule') ) kb.add( InlineKeyboardButton("πŸ§ͺ Test Upload", callback_data='test:single'), InlineKeyboardButton("πŸ“Š Operations", callback_data='menu:operations') ) kb.add( InlineKeyboardButton("βš™οΈ Settings", callback_data='menu:settings'), InlineKeyboardButton("❓ Android Help", callback_data='help:android') ) kb.add( InlineKeyboardButton("πŸ‘¨β€πŸ’» Developer", url='https://t.me/e_d_w'), InlineKeyboardButton("πŸ›‘ Stop All", callback_data='system:stop') ) return kb @staticmethod def folder_browser(path: str, page: int, total_pages: int) -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=1) # Current path indicator display_path = path if len(path) < 40 else '...' + path[-37:] kb.add(InlineKeyboardButton(f"πŸ“‚ {display_path}", callback_data='noop')) # Get folders for this page scan = FileScanner.scan_directory(path) folders = scan['folders'] start = page * FOLDERS_PER_PAGE end = start + FOLDERS_PER_PAGE page_folders = folders[start:end] for folder in page_folders: name = folder['name'][:30] + '...' if len(folder['name']) > 30 else folder['name'] kb.add(InlineKeyboardButton(f"πŸ“ {name}", callback_data=f'folder:enter:{folder["path"]}')) # Pagination nav_row = [] if page > 0: nav_row.append(InlineKeyboardButton("⬅️ Prev", callback_data=f'folder:page:{page-1}')) nav_row.append(InlineKeyboardButton(f"{page+1}/{max(1, total_pages)}", callback_data='noop')) if page < total_pages - 1: nav_row.append(InlineKeyboardButton("Next ➑️", callback_data=f'folder:page:{page+1}')) if nav_row: kb.add(*nav_row) # Action buttons kb.add( InlineKeyboardButton("βœ… Select This Folder", callback_data=f'folder:select:{path}'), InlineKeyboardButton("⬆️ Up", callback_data='folder:up'), InlineKeyboardButton("🏠 Home", callback_data='menu:main') ) return kb @staticmethod def channel_selector(channels: List[Dict], selected: List[int]) -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=1) for ch in channels[:20]: # Limit display prefix = "βœ… " if ch['id'] in selected else "⬜ " name = f"{prefix}{ch['title'][:40]}" kb.add(InlineKeyboardButton(name, callback_data=f'channel:toggle:{ch["id"]}')) kb.add( InlineKeyboardButton("πŸ”„ Refresh List", callback_data='channel:refresh'), InlineKeyboardButton("βœ… Done", callback_data='menu:main') ) return kb @staticmethod def upload_controls() -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=3) kb.add( InlineKeyboardButton("⏸️ Pause", callback_data='upload:pause'), InlineKeyboardButton("▢️ Resume", callback_data='upload:resume'), InlineKeyboardButton("❌ Cancel", callback_data='upload:cancel') ) return kb @staticmethod def file_selector(files: List[Dict], selected: set, page: int) -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=1) per_page = 10 start = page * per_page end = start + per_page page_files = files[start:end] for i, f in enumerate(page_files, start): prefix = "βœ… " if i in selected else "⬜ " size = Uploader.format_size(f['size']) name = f"{prefix}{f['name'][:25]} ({size})" kb.add(InlineKeyboardButton(name, callback_data=f'file:toggle:{i}')) # Pagination total_pages = (len(files) + per_page - 1) // per_page nav = [] if page > 0: nav.append(InlineKeyboardButton("⬅️", callback_data=f'file:page:{page-1}')) nav.append(InlineKeyboardButton(f"{page+1}/{total_pages}", callback_data='noop')) if page < total_pages - 1: nav.append(InlineKeyboardButton("➑️", callback_data=f'file:page:{page+1}')) kb.add(*nav) kb.add( InlineKeyboardButton(f"πŸš€ Upload {len(selected)} files", callback_data='upload:confirmed'), InlineKeyboardButton("🏠 Cancel", callback_data='menu:main') ) return kb @staticmethod def settings_menu() -> InlineKeyboardMarkup: kb = InlineKeyboardMarkup(row_width=1) kb.add( InlineKeyboardButton("πŸ”§ Thread Count", callback_data='set:threads'), InlineKeyboardButton("🎚️ Max Files Limit", callback_data='set:maxfiles'), InlineKeyboardButton("πŸ” Filters", callback_data='set:filters'), InlineKeyboardButton("πŸ”„ Duplicate Prevention", callback_data='set:duplicate'), InlineKeyboardButton("πŸ“ Welcome Template", callback_data='set:welcome'), InlineKeyboardButton("πŸ–ΌοΈ Photo Settings", callback_data='set:photo'), InlineKeyboardButton("⬅️ Back", callback_data='menu:main') ) return kb # ============================================================================= # UPLOAD MANAGER # ============================================================================= class Uploader: """Handles all upload operations with resume, retry, and progress tracking.""" def __init__(self, bot: telebot.TeleBot, ui: UIManager): self.bot = bot self.ui = ui self._speed_window = deque(maxlen=10) # For moving average self._uploaded_hashes: set = set() # Session-level dedup @staticmethod def format_size(size: int) -> str: for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.1f}{unit}" size /= 1024 return f"{size:.1f}PB" @staticmethod def format_time(seconds: float) -> str: if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: return f"{int(seconds/60)}m{int(seconds%60)}s" else: return f"{int(seconds/3600)}h{int((seconds%3600)/60)}m" def _update_progress(self, chat_id: int, state_dict: Dict): """Update progress UI.""" current = state_dict['current'] total = state_dict['total'] current_file = state_dict.get('current_file', 'Unknown') # Calculate stats elapsed = time.time() - state_dict['start_time'] speed = state_dict.get('bytes_uploaded', 0) / max(elapsed, 0.001) self._speed_window.append(speed) avg_speed = sum(self._speed_window) / len(self._speed_window) if self._speed_window else speed remaining_bytes = state_dict.get('total_bytes', 0) - state_dict.get('bytes_uploaded', 0) eta = remaining_bytes / avg_speed if avg_speed > 0 else 0 # Build status text status = "⏸️ PAUSED" if state.upload_paused.get(chat_id) else "πŸš€ UPLOADING" if state.upload_cancelled.get(chat_id): status = "❌ CANCELLING" text = f"""{status} πŸ“„ `{current_file[:40]}` πŸ“Š Progress: {current}/{total} ({100*current//total}% if total > 0 else 0) βœ… Done: {state_dict.get('completed', 0)} | ⏳ Remaining: {total - current} πŸ”„ Retries: {state_dict.get('retries', 0)} | 🚫 Duplicates: {state_dict.get('duplicates', 0)} ⚑ Speed: {self.format_size(avg_speed)}/s | ⏱️ ETA: {self.format_time(eta)} Types: πŸ“·{state_dict.get('images', 0)} 🎬{state_dict.get('videos', 0)} 🎡{state_dict.get('audios', 0)} πŸ“„{state_dict.get('docs', 0)}""" # Check if we should update (throttle to avoid rate limits) last_update = state_dict.get('last_ui_update', 0) if time.time() - last_update > 1.5: # Update every 1.5s max self.ui.update_message(chat_id, text, Keyboards.upload_controls()) state_dict['last_ui_update'] = time.time() def _upload_file(self, filepath: str, targets: List[int], chat_id: int, state_dict: Dict) -> bool: """Upload single file with retry logic.""" filename = os.path.basename(filepath) ext = os.path.splitext(filename)[1].lower() # Determine file type file_type = 'other' for t, exts in FILE_TYPES.items(): if ext in exts: file_type = t break # Duplicate check if state.duplicate_prevention: fingerprint = FileScanner.get_file_fingerprint(filepath, state.duplicate_method) if fingerprint in self._uploaded_hashes: state_dict['duplicates'] = state_dict.get('duplicates', 0) + 1 state.log_operation('duplicate_skipped', {'file': filepath, 'method': state.duplicate_method}) return True self._uploaded_hashes.add(fingerprint) # Determine send method is_video = file_type == 'videos' for attempt in range(MAX_RETRIES): if state.upload_cancelled.get(chat_id): return False # Wait if paused while state.upload_paused.get(chat_id): if state.upload_cancelled.get(chat_id): return False time.sleep(0.5) try: file_size = os.path.getsize(filepath) # Use sendDocument for videos to preserve quality (no compression) # or sendVideo if we want Telegram to generate thumbnail with open(filepath, 'rb') as f: for target in targets: if is_video: # sendDocument preserves original quality self.bot.send_document(target, f, caption=filename[:1024]) else: self.bot.send_document(target, f, caption=filename[:1024]) f.seek(0) # Update stats state_dict['completed'] = state_dict.get('completed', 0) + 1 state_dict['bytes_uploaded'] = state_dict.get('bytes_uploaded', 0) + file_size # Update type counters type_key = file_type if file_type in ['images', 'videos', 'audios'] else 'docs' state_dict[type_key] = state_dict.get(type_key, 0) + 1 return True except Exception as e: logger.warning(f"Upload attempt {attempt+1} failed for {filename}: {e}") state_dict['retries'] = state_dict.get('retries', 0) + 1 if attempt < MAX_RETRIES - 1: delay = RETRY_DELAYS[attempt] logger.info(f"Retrying in {delay}s...") time.sleep(delay) else: state_dict['failed'] = state_dict.get('failed', []) + [filepath] return False return False def start_upload(self, chat_id: int, files: List[Dict], targets: List[int], operation_name: str): """Start upload process with full controls.""" # Reset control flags state.upload_paused[chat_id] = False state.upload_cancelled[chat_id] = False state.active_uploads[chat_id] = threading.Event() total = len(files) total_bytes = sum(f.get('size', 0) for f in files) upload_state = { 'current': 0, 'total': total, 'completed': 0, 'start_time': time.time(), 'bytes_uploaded': 0, 'total_bytes': total_bytes, 'retries': 0, 'duplicates': 0, 'failed': [], 'images': 0, 'videos': 0, 'audios': 0, 'docs': 0, 'last_ui_update': 0, 'operation_name': operation_name, } # Initial UI self.ui.update_message(chat_id, f"πŸš€ Starting upload: {operation_name}\n\nPreparing {total} files...", Keyboards.upload_controls()) # Process uploads for i, file_info in enumerate(files): upload_state['current'] = i + 1 upload_state['current_file'] = file_info['name'] self._update_progress(chat_id, upload_state) success = self._upload_file(file_info['path'], targets, chat_id, upload_state) if not success and state.upload_cancelled.get(chat_id): break # Final report self._send_report(chat_id, upload_state, files, targets) # Cleanup state.active_uploads.pop(chat_id, None) state.upload_paused.pop(chat_id, None) state.upload_cancelled.pop(chat_id, None) def _send_report(self, chat_id: int, upload_state: Dict, files: List[Dict], targets: List[int]): """Generate and send detailed report.""" elapsed = time.time() - upload_state['start_time'] report = f"""βœ… Upload Complete: {upload_state['operation_name']} πŸ“Š Summary: β€’ Total files: {upload_state['total']} β€’ Successful: {upload_state['completed']} β€’ Failed: {len(upload_state['failed'])} β€’ Duplicates skipped: {upload_state['duplicates']} β€’ Retries: {upload_state['retries']} πŸ“ By Type: β€’ Images: {upload_state['images']} β€’ Videos: {upload_state['videos']} β€’ Audio: {upload_state['audios']} β€’ Documents: {upload_state['docs']} ⚑ Performance: β€’ Time: {self.format_time(elapsed)} β€’ Avg speed: {self.format_size(upload_state['bytes_uploaded']/max(elapsed,0.001))}/s β€’ Data uploaded: {self.format_size(upload_state['bytes_uploaded'])}""" if upload_state['failed']: failed_list = '\n'.join(f"β€’ {os.path.basename(f)}" for f in upload_state['failed'][:10]) report += f"\n\n❌ Failed files:\n{failed_list}" if len(upload_state['failed']) > 10: report += f"\n... and {len(upload_state['failed'])-10} more" # Send to admin self.ui.update_message(chat_id, report, Keyboards.main_menu()) # Send to targets as document if long if len(report) > 4000: # Create report file report_path = DATA_DIR / f"report_{datetime.now():%Y%m%d_%H%M%S}.txt" with open(report_path, 'w', encoding='utf-8') as f: f.write(report) f.write("\n\nDetailed file list:\n") for fi in files: status = "βœ…" if fi['path'] not in upload_state['failed'] else "❌" f.write(f"{status} {fi['path']} ({self.format_size(fi['size'])})\n") for target in targets: try: with open(report_path, 'rb') as f: self.bot.send_document(target, f, caption=f"Upload report: {upload_state['operation_name']}") except Exception as e: logger.error(f"Failed to send report to {target}: {e}") report_path.unlink(missing_ok=True) # Log operation state.log_operation('upload_complete', { 'name': upload_state['operation_name'], 'total': upload_state['total'], 'completed': upload_state['completed'], 'failed': len(upload_state['failed']), 'duration': elapsed }) # ============================================================================= # SCHEDULER # ============================================================================= class JobScheduler: """Manages scheduled upload jobs.""" def __init__(self, bot: telebot.TeleBot, uploader: Uploader, ui: UIManager): self.bot = bot self.uploader = uploader self.ui = ui self._running = True self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def _run(self): """Main scheduler loop.""" while self._running: try: now = datetime.now() for job in list(state.scheduled_jobs): scheduled_time = datetime.fromisoformat(job['scheduled_time']) if scheduled_time <= now and not job.get('started'): self._execute_job(job) time.sleep(10) # Check every 10 seconds except Exception as e: logger.error(f"Scheduler error: {e}") time.sleep(30) def _execute_job(self, job: Dict): """Execute scheduled job.""" job['started'] = True state.save() chat_id = job['chat_id'] # Notify start self.ui.update_message(chat_id, f"⏰ Scheduled job starting: {job['name']}") # Execute upload thread_manager.submit_upload(lambda: self.uploader.start_upload( chat_id, job['files'], job['targets'], job['name'] )) # Mark completed job['completed'] = True job['completed_time'] = datetime.now().isoformat() state.save() def add_job(self, chat_id: int, name: str, scheduled_time: datetime, files: List[Dict], targets: List[int]) -> str: """Add new scheduled job.""" job_id = f"job_{int(time.time())}_{chat_id}" job = { 'id': job_id, 'chat_id': chat_id, 'name': name, 'scheduled_time': scheduled_time.isoformat(), 'created_time': datetime.now().isoformat(), 'files': files, 'targets': targets, 'started': False, 'completed': False, } state.scheduled_jobs.append(job) state.save() return job_id def cancel_job(self, job_id: str) -> bool: """Cancel pending job.""" for job in state.scheduled_jobs: if job['id'] == job_id and not job.get('started'): job['cancelled'] = True state.save() return True return False def get_pending_jobs(self, chat_id: Optional[int] = None) -> List[Dict]: """Get pending jobs.""" jobs = [j for j in state.scheduled_jobs if not j.get('started') and not j.get('cancelled')] if chat_id: jobs = [j for j in jobs if j['chat_id'] == chat_id] return jobs def shutdown(self): self._running = False # ============================================================================= # BOT INITIALIZATION # ============================================================================= def create_bot() -> telebot.TeleBot: """Create and configure bot.""" token = os.environ.get('BOT_TOKEN') if not token: # Try to load from file token_file = DATA_DIR / 'bot_token.txt' if token_file.exists(): token = token_file.read_text().strip() if not token: print("ERROR: BOT_TOKEN not set!") print("Please set BOT_TOKEN environment variable or create bot_data/bot_token.txt") sys.exit(1) bot = telebot.TeleBot(token, parse_mode='Markdown', threaded=True) return bot # ============================================================================= # MESSAGE HANDLERS # ============================================================================= def setup_handlers(bot: telebot.TeleBot, ui: UIManager, uploader: Uploader, scheduler: JobScheduler): """Set up all bot handlers.""" # ========== COMMAND HANDLERS ========== @bot.message_handler(commands=['start']) def handle_start(message): user_id = message.from_user.id # Admin assignment if state.admin_id is None: state.admin_id = user_id state.save() logger.info(f"Admin assigned: {user_id}") welcome = f"""πŸŽ‰ **You are now the Admin!** Your Telegram ID: `{user_id}` This bot manages files on your Android device. Use the menu below to get started.""" msg = bot.send_message(user_id, welcome, reply_markup=Keyboards.main_menu()) state.set_chat_cache(user_id, 'message_ids', msg.message_id) return # Existing admin if state.is_admin(user_id