From 9d365cccd1fe040b8e03c87879bc8406204aa867 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:07:27 +0800 Subject: [PATCH 01/10] initiate cleanup --- telegram-bridge/__init__.py | 5 + telegram-bridge/config.py | 49 +++ telegram-bridge/main.py | 680 ++++++------------------------------ 3 files changed, 156 insertions(+), 578 deletions(-) create mode 100644 telegram-bridge/__init__.py create mode 100644 telegram-bridge/config.py diff --git a/telegram-bridge/__init__.py b/telegram-bridge/__init__.py new file mode 100644 index 0000000..b4cbc5f --- /dev/null +++ b/telegram-bridge/__init__.py @@ -0,0 +1,5 @@ +""" +Telegram Bridge. + +Provides a bridge between Telegram API and HTTP API for sending and receiving messages. +""" \ No newline at end of file diff --git a/telegram-bridge/config.py b/telegram-bridge/config.py new file mode 100644 index 0000000..e308321 --- /dev/null +++ b/telegram-bridge/config.py @@ -0,0 +1,49 @@ +""" +Configuration management for the Telegram bridge. + +Loads environment variables and provides configuration settings. +""" + +import os +import logging +from dotenv import load_dotenv + +# Load environment variables from .env file if it exists +load_dotenv() + +# Directory paths +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +STORE_DIR = os.path.join(BASE_DIR, "store") +os.makedirs(STORE_DIR, exist_ok=True) + +# Session and database files +SESSION_FILE = os.path.join(STORE_DIR, "telegram_session") +DB_PATH = os.path.join(STORE_DIR, "messages.db") + +# API credentials +API_ID = os.getenv("TELEGRAM_API_ID") +API_HASH = os.getenv("TELEGRAM_API_HASH") + +# Server configuration +HTTP_PORT = int(os.getenv("HTTP_PORT", "8081")) +HTTP_HOST = os.getenv("HTTP_HOST", "") + +# Logging configuration +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + +# Initialize logging +logging.basicConfig( + level=getattr(logging, LOG_LEVEL), + format=LOG_FORMAT, + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# Validate required settings +if not API_ID or not API_HASH: + logger.error( + "TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set" + ) + logger.error("Get them from https://my.telegram.org/auth") + raise ValueError("Missing API credentials") \ No newline at end of file diff --git a/telegram-bridge/main.py b/telegram-bridge/main.py index 69bb236..19f5ba1 100644 --- a/telegram-bridge/main.py +++ b/telegram-bridge/main.py @@ -1,610 +1,134 @@ """ -Telegram Bridge Module +Telegram Bridge main entry point. -This module connects to the Telegram API using the Telethon library, enabling -interaction with Telegram chats and messages. It provides an HTTP server for -sending messages to Telegram users or groups, and stores message history in -a SQLite database. - -Key Features: -- Connects to Telegram using API credentials. -- Stores chat and message data in a database. -- Provides an HTTP API for sending messages. -- Synchronizes message history and processes new messages. - -Usage: -- Ensure TELEGRAM_API_ID and TELEGRAM_API_HASH are set in environment variables. -- Run the script to start the Telegram bridge and HTTP server. -- Use the '/api/send' endpoint to send messages via HTTP requests. +This module initializes and runs the Telegram bridge application, connecting +to the Telegram API and providing a REST API server for interaction. """ -import os -import sqlite3 -import json import asyncio import logging import sys -from datetime import datetime -from typing import Dict, Any, Optional, List, Tuple -from http.server import HTTPServer, BaseHTTPRequestHandler -import threading -from dotenv import load_dotenv +import uvicorn -from telethon import TelegramClient, events -from telethon.tl.types import ( - User, - Chat, - Channel, - Message, - Dialog, -) -from telethon.utils import get_display_name +from config import API_ID, API_HASH, SESSION_FILE, HTTP_PORT, HTTP_HOST +from database import init_db, ChatRepository, MessageRepository +from api import TelegramApiClient, TelegramMiddleware +from service import TelegramService +from server import app, get_telegram_service -# Global variable to store the main event loop -main_loop = None - -# Load environment variables from .env file if it exists -load_dotenv() - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - handlers=[logging.StreamHandler(sys.stdout)], -) +# Initialize logger logger = logging.getLogger(__name__) -# Directory for storing data -STORE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "store") -os.makedirs(STORE_DIR, exist_ok=True) -# Database path -DB_PATH = os.path.join(STORE_DIR, "messages.db") - -# API credentials from environment variables -API_ID = os.getenv("TELEGRAM_API_ID") -API_HASH = os.getenv("TELEGRAM_API_HASH") - -if not API_ID or not API_HASH: - logger.error( - "TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set" - ) - logger.error("Get them from https://my.telegram.org/auth") - sys.exit(1) - -# Initialize the Telegram client -SESSION_FILE = os.path.join(STORE_DIR, "telegram_session") -client = TelegramClient(SESSION_FILE, API_ID, API_HASH) - - -class MessageStore: - """Handles storage and retrieval of Telegram messages in SQLite.""" - - def __init__(self, db_path: str): - """Initialize the message store with the given database path.""" - self.db_path = db_path - self.init_db() - - def init_db(self): - """Initialize the database with necessary tables.""" - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - # Create tables if they don't exist - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS chats ( - id INTEGER PRIMARY KEY, - title TEXT, - username TEXT, - type TEXT, - last_message_time TIMESTAMP - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS messages ( - id INTEGER, - chat_id INTEGER, - sender_id INTEGER, - sender_name TEXT, - content TEXT, - timestamp TIMESTAMP, - is_from_me BOOLEAN, - PRIMARY KEY (id, chat_id), - FOREIGN KEY (chat_id) REFERENCES chats(id) - ) - """ - ) - - # Create indexes for efficient queries - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_content ON messages(content)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_sender_id ON messages(sender_id)" - ) - - conn.commit() - conn.close() - - def store_chat( - self, - chat_id: int, - title: str, - username: Optional[str], - chat_type: str, - last_message_time: datetime, - ) -> None: - """Store a chat in the database.""" - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - cursor.execute( - "INSERT OR REPLACE INTO chats (id, title, username, type, last_message_time) VALUES (?, ?, ?, ?, ?)", - (chat_id, title, username, chat_type, last_message_time.isoformat()), - ) - - conn.commit() - conn.close() - - def store_message( - self, - message_id: int, - chat_id: int, - sender_id: int, - sender_name: str, - content: str, - timestamp: datetime, - is_from_me: bool, - ) -> None: - """Store a message in the database.""" - if not content: # Skip empty messages - return - - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - cursor.execute( - """INSERT OR REPLACE INTO messages - (id, chat_id, sender_id, sender_name, content, timestamp, is_from_me) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - ( - message_id, - chat_id, - sender_id, - sender_name, - content, - timestamp.isoformat(), - is_from_me, - ), - ) - - conn.commit() - conn.close() - - def get_messages( - self, - chat_id: Optional[int] = None, - limit: int = 50, - query: Optional[str] = None, - offset: int = 0, - ) -> List[Dict[str, Any]]: - """Get messages from the database.""" - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - query_parts = [ - "SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m" - ] - query_parts.append("JOIN chats c ON m.chat_id = c.id") - - conditions = [] - params = [] - - if chat_id: - conditions.append("m.chat_id = ?") - params.append(chat_id) - - if query: - conditions.append("m.content LIKE ?") - params.append(f"%{query}%") - - if conditions: - query_parts.append("WHERE " + " AND ".join(conditions)) - - query_parts.append("ORDER BY m.timestamp DESC") - query_parts.append("LIMIT ? OFFSET ?") - params.extend([limit, offset]) - - cursor.execute(" ".join(query_parts), tuple(params)) - messages = cursor.fetchall() - - results = [] - for msg in messages: - timestamp = datetime.fromisoformat(msg[5]) - results.append( - { - "id": msg[0], - "chat_id": msg[1], - "chat_title": msg[2], - "sender_name": msg[3], - "content": msg[4], - "timestamp": timestamp, - "is_from_me": msg[6], - "sender_id": msg[7], - } - ) - - conn.close() - return results - - def get_chats( - self, limit: int = 50, query: Optional[str] = None - ) -> List[Dict[str, Any]]: - """Get chats from the database.""" - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"] - params = [] - - if query: - query_parts.append("WHERE title LIKE ? OR username LIKE ?") - params.extend([f"%{query}%", f"%{query}%"]) - - query_parts.append("ORDER BY last_message_time DESC") - query_parts.append("LIMIT ?") - params.append(limit) - - cursor.execute(" ".join(query_parts), tuple(params)) - chats = cursor.fetchall() - - results = [] - for chat in chats: - last_message_time = datetime.fromisoformat(chat[4]) if chat[4] else None - results.append( - { - "id": chat[0], - "title": chat[1], - "username": chat[2], - "type": chat[3], - "last_message_time": last_message_time, - } - ) - - conn.close() - return results - - -# Create message store -message_store = MessageStore(DB_PATH) - - -async def process_message(message: Message) -> None: - """Process and store a message.""" - if not message.text: - return # Skip non-text messages - - # Get the chat - chat = message.chat - if not chat: - return - - chat_id = message.chat_id - - # Determine chat type and name - if isinstance(chat, User): - chat_type = "user" - title = get_display_name(chat) - username = chat.username - elif isinstance(chat, Chat): - chat_type = "group" - title = chat.title - username = None - elif isinstance(chat, Channel): - chat_type = "channel" if chat.broadcast else "supergroup" - title = chat.title - username = chat.username - else: - logger.warning(f"Unknown chat type: {type(chat)}") - return - - # Store chat information - message_store.store_chat( - chat_id=chat_id, - title=title, - username=username, - chat_type=chat_type, - last_message_time=message.date, - ) - - # Get sender information - sender = await message.get_sender() - sender_id = sender.id if sender else 0 - sender_name = get_display_name(sender) if sender else "Unknown" - - # Check if the message is from the current user - my_id = (await client.get_me()).id - is_from_me = sender_id == my_id - - # Store the message - message_store.store_message( - message_id=message.id, - chat_id=chat_id, - sender_id=sender_id, - sender_name=sender_name, - content=message.text, - timestamp=message.date, - is_from_me=is_from_me, - ) - - logger.info( - f"Stored message: [{message.date}] {sender_name} in {title}: {message.text[:30]}..." - ) - - -async def sync_dialog_history(dialog: Dialog, limit: int = 100) -> None: - """Sync message history for a specific dialog.""" - chat_entity = dialog.entity - - # Extract chat info - if isinstance(chat_entity, User): - chat_type = "user" - title = get_display_name(chat_entity) - username = chat_entity.username - elif isinstance(chat_entity, Chat): - chat_type = "group" - title = chat_entity.title - username = None - elif isinstance(chat_entity, Channel): - chat_type = "channel" if chat_entity.broadcast else "supergroup" - title = chat_entity.title - username = chat_entity.username - else: - logger.warning(f"Unknown chat type: {type(chat_entity)}") - return - - # Store chat info with last message time - message_store.store_chat( - chat_id=dialog.id, - title=title, - username=username, - chat_type=chat_type, - last_message_time=dialog.date, - ) - - # Get messages - messages = await client.get_messages(dialog.entity, limit=limit) - - # Get current user ID - my_id = (await client.get_me()).id - - # Process each message - for message in messages: - if not message.text: - continue # Skip non-text messages - - # Get sender information - try: - sender = await message.get_sender() - sender_id = sender.id if sender else 0 - sender_name = get_display_name(sender) if sender else "Unknown" - except Exception as e: - logger.error(f"Error getting sender: {e}") - sender_id = 0 - sender_name = "Unknown" - - is_from_me = sender_id == my_id - - # Store the message - message_store.store_message( - message_id=message.id, - chat_id=dialog.id, - sender_id=sender_id, - sender_name=sender_name, - content=message.text, - timestamp=message.date, - is_from_me=is_from_me, - ) - - logger.info(f"Synced {len(messages)} messages from {title}") - - -async def sync_all_dialogs() -> None: - """Sync message history for all dialogs.""" - logger.info("Starting synchronization of all dialogs") - - # Get all dialogs (chats) - dialogs = await client.get_dialogs(limit=100) - - for dialog in dialogs: - try: - await sync_dialog_history(dialog) - except Exception as e: - logger.error(f"Error syncing dialog {dialog.name}: {e}") - - logger.info(f"Completed synchronization of {len(dialogs)} dialogs") - - -# HTTP server for API endpoints -class TelegramAPIHandler(BaseHTTPRequestHandler): - def do_POST(self): - content_length = int(self.headers["Content-Length"]) - post_data = self.rfile.read(content_length) - request = json.loads(post_data.decode("utf-8")) - - if self.path == "/api/send": - self._handle_send_message(request) - else: - self.send_error(404, "Endpoint not found") - - def _handle_send_message(self, request): - recipient = request.get("recipient") - message_text = request.get("message") - - if not recipient or not message_text: - self._send_json_response( - 400, {"success": False, "message": "Recipient and message are required"} - ) - return - - try: - # Instead of creating a new event loop, use a shared queue to communicate with the main thread - # Create a Future to hold the result - future = asyncio.run_coroutine_threadsafe( - send_message(recipient, message_text), main_loop - ) - - # Wait for the result (with timeout) - try: - success, message = future.result(10) # Wait up to 10 seconds - self._send_json_response( - 200 if success else 500, {"success": success, "message": message} - ) - except asyncio.TimeoutError: - self._send_json_response( - 504, - { - "success": False, - "message": "Request timed out while sending message", - }, - ) - except Exception as inner_e: - logger.error(f"Error in Future: {inner_e}") - self._send_json_response( - 500, - {"success": False, "message": f"Error in Future: {str(inner_e)}"}, - ) - - except Exception as e: - logger.error(f"Error sending message: {e}") - self._send_json_response( - 500, {"success": False, "message": f"Error: {str(e)}"} - ) - - def _send_json_response(self, status_code, data): - self.send_response(status_code) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps(data).encode("utf-8")) - - -async def send_message(recipient: str, message: str) -> Tuple[bool, str]: - """Send a message to a Telegram recipient.""" - if not client.is_connected(): - return False, "Not connected to Telegram" - - try: - # Try to parse recipient as an integer (chat ID) - try: - chat_id = int(recipient) - entity = await client.get_entity(chat_id) - except ValueError: - # Not an integer, try as username - if recipient.startswith("@"): - recipient = recipient[1:] # Remove @ if present - try: - entity = await client.get_entity(recipient) - except Exception: - # Try to find in database - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - cursor.execute( - "SELECT id FROM chats WHERE title LIKE ? OR username = ?", - (f"%{recipient}%", recipient), - ) - result = cursor.fetchone() - conn.close() - - if result: - entity = await client.get_entity(result[0]) - else: - return False, f"Recipient not found: {recipient}" - - # Send the message - await client.send_message(entity, message) - return True, f"Message sent to {recipient}" - - except Exception as e: - logger.error(f"Error sending message: {e}") - return False, f"Error sending message: {str(e)}" - - -# Start HTTP server in a separate thread -def start_http_server(port: int = 8081): - server_address = ("", port) - httpd = HTTPServer(server_address, TelegramAPIHandler) - logger.info(f"Starting HTTP server on port {port}") - httpd.serve_forever() - - -async def main(): - global main_loop - - # Store the current event loop - main_loop = asyncio.get_event_loop() - - logger.info("Starting Telegram bridge") - - # Connect to Telegram - await client.connect() - - # Check if we're already authorized - if not await client.is_user_authorized(): +# Global service instance +telegram_service = None + + +# Override the get_telegram_service function in the API app +def get_service_override(): + """Get the Telegram service singleton.""" + global telegram_service + if telegram_service is None: + raise RuntimeError("Telegram service not initialized") + return telegram_service + + +# Initialize the application +async def init_app(): + """Initialize the application components.""" + global telegram_service + + # Initialize database + init_db() + + # Create repositories + chat_repo = ChatRepository() + message_repo = MessageRepository() + + # Create API client + client = TelegramApiClient(SESSION_FILE, API_ID, API_HASH) + + # Create middleware + middleware = TelegramMiddleware(client) + + # Create service + telegram_service = TelegramService(client, middleware, chat_repo, message_repo) + + # Override the service getter in the API app + app.dependency_overrides[get_telegram_service] = get_service_override + + # Setup the service + await telegram_service.setup() + + # Return the service for further use + return telegram_service + + +async def login_flow(): + """Interactive login flow for Telegram.""" + global telegram_service + + if not await telegram_service.authorize(): logger.info("Need to log in. Please enter your phone number:") phone = input("Phone number: ") - await client.send_code_request(phone) + + # Send the code request + await telegram_service.client.send_code_request(phone) + logger.info("Code sent. Please enter the code you received:") code = input("Code: ") + try: - await client.sign_in(phone, code) + success = await telegram_service.login(phone, code) + if not success: + logger.error("Failed to log in with the provided code") + return False except Exception as e: logger.error(f"Error signing in: {e}") logger.info( "If you have two-factor authentication enabled, please enter your password:" ) password = input("Password: ") - await client.sign_in(password=password) - + success = await telegram_service.login(phone, code, password) + if not success: + logger.error("Failed to log in with the provided password") + return False + logger.info("Successfully logged in to Telegram") + return True - # Start HTTP server in a separate thread - server_thread = threading.Thread(target=start_http_server) - server_thread.daemon = True - server_thread.start() - # Register event handler for new messages - @client.on(events.NewMessage) - async def handle_new_message(event): - await process_message(event.message) - - # Initial sync of message history - await sync_all_dialogs() - - # Keep the script running - logger.info("Telegram bridge is running. Press Ctrl+C to exit.") +async def main(): + """Main application entry point.""" try: - while True: - await asyncio.sleep(1) - except KeyboardInterrupt: - pass - - -if __name__ == "__main__": - # Run the main function - try: - asyncio.run(main()) + # Initialize the application + global telegram_service + telegram_service = await init_app() + + # Login to Telegram if needed + if not await login_flow(): + logger.error("Failed to authenticate with Telegram") + return + + # Initial sync of message history + await telegram_service.sync_all_dialogs() + + # Start the API server + logger.info(f"Starting FastAPI server on {HTTP_HOST}:{HTTP_PORT}") + config = uvicorn.Config(app=app, host=HTTP_HOST, port=HTTP_PORT, log_level="info") + server = uvicorn.Server(config) + + # Keep the script running + logger.info("Telegram bridge is running. Press Ctrl+C to exit.") + await server.serve() + except KeyboardInterrupt: logger.info("Shutting down Telegram bridge") except Exception as e: logger.error(f"Unexpected error: {e}") sys.exit(1) + + +if __name__ == "__main__": + # Run the main function + asyncio.run(main()) From d8e7950b53adf2d3ce53a7fc50cfe856d8f8f1ff Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:07:35 +0800 Subject: [PATCH 02/10] update requirements --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/requirements.txt b/requirements.txt index 040260a..2ae6dd7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,10 @@ telethon cryptg python-dotenv requests +sqlalchemy +pydantic +fastapi +uvicorn # MCP Server dependencies fastmcp From dd9015bc24036909e8eee7427a5700d249043f36 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:08:31 +0800 Subject: [PATCH 03/10] telegram service --- telegram-bridge/service.py | 224 +++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 telegram-bridge/service.py diff --git a/telegram-bridge/service.py b/telegram-bridge/service.py new file mode 100644 index 0000000..c203bae --- /dev/null +++ b/telegram-bridge/service.py @@ -0,0 +1,224 @@ +""" +Service layer for the Telegram bridge. + +Connects the API middleware with database repositories to provide +high-level operations for the application. +""" + +import logging +from typing import Optional, List, Dict, Any, Tuple +from datetime import datetime + +from telethon import events + +from api import TelegramApiClient, TelegramMiddleware +from database import ChatRepository, MessageRepository + +logger = logging.getLogger(__name__) + + +class TelegramService: + """Service for Telegram operations.""" + + def __init__( + self, + telegram_client: TelegramApiClient, + middleware: TelegramMiddleware, + chat_repo: ChatRepository, + message_repo: MessageRepository + ): + """Initialize the service. + + Args: + telegram_client: Telegram API client + middleware: Telegram middleware + chat_repo: Chat repository + message_repo: Message repository + """ + self.client = telegram_client + self.middleware = middleware + self.chat_repo = chat_repo + self.message_repo = message_repo + + async def setup(self) -> None: + """Set up the service, connect to Telegram, and register handlers.""" + # Connect to Telegram + await self.client.connect() + + # Register event handlers + self.client.add_event_handler(self._handle_new_message, events.NewMessage) + + async def authorize(self) -> bool: + """Authorize with Telegram if needed.""" + if await self.client.is_authorized(): + logger.info("Already authorized with Telegram") + return True + + logger.info("Not authorized with Telegram. Interactive login required.") + return False + + async def login(self, phone: str, code: str, password: Optional[str] = None) -> bool: + """Login to Telegram. + + Args: + phone: Phone number + code: Verification code + password: Two-factor authentication password (optional) + + Returns: + bool: True if login successful, False otherwise + """ + if password: + return await self.client.sign_in(phone=phone, code=code, password=password) + else: + # First send code request + await self.client.send_code_request(phone) + # Then sign in with the code + return await self.client.sign_in(phone=phone, code=code) + + async def sync_all_dialogs(self, limit: int = 100) -> None: + """Sync all dialogs (chats) from Telegram. + + Args: + limit: Maximum number of dialogs to retrieve + """ + logger.info("Starting synchronization of all dialogs") + + # Get all dialogs (chats) + dialogs = await self.client.get_dialogs(limit=limit) + + for dialog in dialogs: + try: + await self.sync_dialog_history(dialog) + except Exception as e: + logger.error(f"Error syncing dialog {dialog.name}: {e}") + + logger.info(f"Completed synchronization of {len(dialogs)} dialogs") + + async def sync_dialog_history(self, dialog, limit: int = 100) -> None: + """Sync message history for a specific dialog. + + Args: + dialog: Dialog to sync + limit: Maximum number of messages to retrieve + """ + # Process dialog entity + chat_info = await self.middleware.process_dialog(dialog) + + if not chat_info: + logger.warning(f"Could not process dialog: {dialog}") + return + + # Store chat information + self.chat_repo.store_chat( + chat_id=chat_info["id"], + title=chat_info["title"], + username=chat_info.get("username"), + chat_type=chat_info["type"], + last_message_time=chat_info["last_message_time"] + ) + + # Get messages + messages = await self.client.get_messages(dialog.entity, limit=limit) + + # Process each message + for message in messages: + msg_info = await self.middleware.process_message(message) + if msg_info: + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + + logger.info(f"Synced {len(messages)} messages from {chat_info['title']}") + + async def send_message(self, recipient: str, message: str) -> Tuple[bool, str]: + """Send a message to a Telegram recipient. + + Args: + recipient: Recipient identifier (ID, username, or title) + message: Message text to send + + Returns: + Tuple[bool, str]: Success status and message + """ + if not self.client.client.is_connected(): + return False, "Not connected to Telegram" + + entity = await self.middleware.find_entity_by_name_or_id(recipient) + + if not entity: + # Try to find in database + try: + # Try to parse as integer + chat_id = int(recipient) + chat = self.chat_repo.get_chat_by_id(chat_id) + if chat: + entity = await self.client.get_entity(chat_id) + except ValueError: + # Not an integer, try to find by name + chats = self.chat_repo.get_chats(query=recipient, limit=1) + if chats: + entity = await self.client.get_entity(chats[0].id) + + if not entity: + return False, f"Recipient not found: {recipient}" + + # Send the message + sent_message = await self.client.send_message(entity, message) + + if sent_message: + # Process and store the sent message + msg_info = await self.middleware.process_message(sent_message) + if msg_info: + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + return True, f"Message sent to {recipient}" + else: + return False, f"Failed to send message to {recipient}" + + async def _handle_new_message(self, event) -> None: + """Handle a new message event from Telegram.""" + message = event.message + msg_info = await self.middleware.process_message(message) + + if msg_info: + # Process and store chat information + chat_entity = message.chat + if chat_entity: + chat_info = await self.middleware.process_chat_entity(chat_entity) + self.chat_repo.store_chat( + chat_id=chat_info["id"], + title=chat_info["title"], + username=chat_info.get("username"), + chat_type=chat_info["type"], + last_message_time=message.date + ) + + # Store the message + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + + logger.info( + f"Stored message: [{msg_info['timestamp']}] {msg_info['sender_name']} " + f"in {msg_info['chat_title']}: {msg_info['content'][:30]}..." + ) \ No newline at end of file From 922d226719e74725318469ab1ff51c4902a18392 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:13:36 +0800 Subject: [PATCH 04/10] api for telegram interaction --- telegram-bridge/api/__init__.py | 5 + telegram-bridge/api/client.py | 190 ++++++++++++++++++++++++++++++ telegram-bridge/api/middleware.py | 155 ++++++++++++++++++++++++ telegram-bridge/api/models.py | 49 ++++++++ 4 files changed, 399 insertions(+) create mode 100644 telegram-bridge/api/__init__.py create mode 100644 telegram-bridge/api/client.py create mode 100644 telegram-bridge/api/middleware.py create mode 100644 telegram-bridge/api/models.py diff --git a/telegram-bridge/api/__init__.py b/telegram-bridge/api/__init__.py new file mode 100644 index 0000000..0bb9df1 --- /dev/null +++ b/telegram-bridge/api/__init__.py @@ -0,0 +1,5 @@ +""" +API module for Telegram interaction. + +Provides client, middleware, and models for interacting with the Telegram API. +""" \ No newline at end of file diff --git a/telegram-bridge/api/client.py b/telegram-bridge/api/client.py new file mode 100644 index 0000000..5843a5c --- /dev/null +++ b/telegram-bridge/api/client.py @@ -0,0 +1,190 @@ +""" +Telegram API client. + +Handles communication with the Telegram API using the Telethon library. +""" + +import os +import logging +import asyncio +from typing import Optional, List, Dict, Any, Callable, Tuple +from datetime import datetime + +from telethon import TelegramClient +from telethon.tl.types import User, Chat, Channel, Message, Dialog +from telethon.utils import get_display_name + +logger = logging.getLogger(__name__) + + +class TelegramApiClient: + """Client for interacting with the Telegram API.""" + + def __init__(self, session_file: str, api_id: str, api_hash: str): + """Initialize the Telegram API client. + + Args: + session_file: Path to the session file for authentication + api_id: Telegram API ID + api_hash: Telegram API hash + """ + self.session_file = session_file + self.api_id = api_id + self.api_hash = api_hash + self.client = TelegramClient(session_file, api_id, api_hash) + self._me = None + + async def connect(self) -> bool: + """Connect to the Telegram API. + + Returns: + bool: True if successfully connected, False otherwise + """ + try: + await self.client.connect() + return True + except Exception as e: + logger.error(f"Failed to connect to Telegram: {e}") + return False + + async def is_authorized(self) -> bool: + """Check if the client is authorized. + + Returns: + bool: True if authorized, False otherwise + """ + return await self.client.is_user_authorized() + + async def send_code_request(self, phone: str) -> bool: + """Send a code request to the given phone number. + + Args: + phone: Phone number to send code to + + Returns: + bool: True if code sent successfully, False otherwise + """ + try: + await self.client.send_code_request(phone) + return True + except Exception as e: + logger.error(f"Failed to send code request: {e}") + return False + + async def sign_in(self, phone: Optional[str] = None, code: Optional[str] = None, + password: Optional[str] = None) -> bool: + """Sign in to Telegram. + + Args: + phone: Phone number (optional) + code: Verification code (optional) + password: Two-factor authentication password (optional) + + Returns: + bool: True if signed in successfully, False otherwise + """ + try: + if password: + await self.client.sign_in(password=password) + else: + await self.client.sign_in(phone, code) + return True + except Exception as e: + logger.error(f"Failed to sign in: {e}") + return False + + async def get_me(self) -> Optional[User]: + """Get the current user. + + Returns: + User: Current user object + """ + if not self._me: + self._me = await self.client.get_me() + return self._me + + async def get_dialogs(self, limit: int = 100) -> List[Dialog]: + """Get dialogs (chats) from Telegram. + + Args: + limit: Maximum number of dialogs to retrieve + + Returns: + List[Dialog]: List of dialogs + """ + try: + return await self.client.get_dialogs(limit=limit) + except Exception as e: + logger.error(f"Failed to get dialogs: {e}") + return [] + + async def get_entity(self, entity_id: Any) -> Optional[Any]: + """Get an entity from Telegram. + + Args: + entity_id: ID of the entity to retrieve + + Returns: + Any: Entity object (User, Chat, or Channel) + """ + try: + return await self.client.get_entity(entity_id) + except Exception as e: + logger.error(f"Failed to get entity: {e}") + return None + + async def get_messages(self, entity: Any, limit: int = 100) -> List[Message]: + """Get messages from a chat. + + Args: + entity: Chat entity + limit: Maximum number of messages to retrieve + + Returns: + List[Message]: List of messages + """ + try: + return await self.client.get_messages(entity, limit=limit) + except Exception as e: + logger.error(f"Failed to get messages: {e}") + return [] + + async def send_message(self, entity: Any, message: str) -> Optional[Message]: + """Send a message to a chat. + + Args: + entity: Chat entity + message: Message text to send + + Returns: + Message: Sent message object + """ + try: + return await self.client.send_message(entity, message) + except Exception as e: + logger.error(f"Failed to send message: {e}") + return None + + def add_event_handler(self, callback: Callable, event: Any) -> None: + """Add an event handler. + + Args: + callback: Callback function to handle the event + event: Event to handle + """ + self.client.add_event_handler(callback, event) + + async def disconnect(self) -> None: + """Disconnect from the Telegram API.""" + await self.client.disconnect() + + def __del__(self): + """Cleanup when the client is deleted.""" + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self.disconnect()) + else: + loop.run_until_complete(self.disconnect()) + except: + pass \ No newline at end of file diff --git a/telegram-bridge/api/middleware.py b/telegram-bridge/api/middleware.py new file mode 100644 index 0000000..5231cc9 --- /dev/null +++ b/telegram-bridge/api/middleware.py @@ -0,0 +1,155 @@ +""" +Telegram API middleware. + +Handles common operations between the application and Telegram API such as +authentication, error handling, and entity type conversion. +""" + +import logging +from typing import Dict, Any, Optional, Tuple, List, Callable +from datetime import datetime +from functools import wraps + +from telethon.tl.types import User, Chat, Channel, Message, Dialog +from telethon.utils import get_display_name + +from api.client import TelegramApiClient + +logger = logging.getLogger(__name__) + + +def handle_telegram_errors(func): + """Decorator to handle Telegram API errors.""" + @wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error(f"Telegram API error in {func.__name__}: {e}") + return None + return wrapper + + +class TelegramMiddleware: + """Middleware for Telegram API operations.""" + + def __init__(self, client: TelegramApiClient): + """Initialize the middleware with a Telegram client. + + Args: + client: Initialized Telegram API client + """ + self.client = client + + async def process_chat_entity(self, entity: Any) -> Dict[str, Any]: + """Process a chat entity and convert it to a dictionary. + + Args: + entity: Chat entity from Telegram API + + Returns: + Dict: Standardized chat representation + """ + if isinstance(entity, User): + chat_type = "user" + title = get_display_name(entity) + username = entity.username + elif isinstance(entity, Chat): + chat_type = "group" + title = entity.title + username = None + elif isinstance(entity, Channel): + chat_type = "channel" if entity.broadcast else "supergroup" + title = entity.title + username = entity.username + else: + logger.warning(f"Unknown chat type: {type(entity)}") + return {} + + return { + "id": entity.id, + "title": title, + "username": username, + "type": chat_type + } + + @handle_telegram_errors + async def process_dialog(self, dialog: Dialog) -> Dict[str, Any]: + """Process a dialog and convert it to a dictionary. + + Args: + dialog: Dialog from Telegram API + + Returns: + Dict: Standardized dialog representation + """ + chat_info = await self.process_chat_entity(dialog.entity) + chat_info["last_message_time"] = dialog.date + return chat_info + + @handle_telegram_errors + async def process_message(self, message: Message) -> Optional[Dict[str, Any]]: + """Process a message and convert it to a dictionary. + + Args: + message: Message from Telegram API + + Returns: + Dict: Standardized message representation + """ + if not message.text: + return None # Skip non-text messages + + # Get the chat + chat = message.chat + if not chat: + return None + + chat_info = await self.process_chat_entity(chat) + + # Get sender information + sender = await message.get_sender() + sender_id = sender.id if sender else 0 + sender_name = get_display_name(sender) if sender else "Unknown" + + # Check if the message is from the current user + my_id = (await self.client.get_me()).id + is_from_me = sender_id == my_id + + return { + "id": message.id, + "chat_id": chat_info["id"], + "chat_title": chat_info["title"], + "sender_id": sender_id, + "sender_name": sender_name, + "content": message.text, + "timestamp": message.date, + "is_from_me": is_from_me + } + + @handle_telegram_errors + async def find_entity_by_name_or_id(self, recipient: str) -> Optional[Any]: + """Find an entity by name or ID. + + Args: + recipient: Recipient identifier (ID, username, or title) + + Returns: + Any: Found entity or None + """ + # Try to parse as an integer (chat ID) + try: + chat_id = int(recipient) + return await self.client.get_entity(chat_id) + except ValueError: + pass + + # Not an integer, try as username + if recipient.startswith("@"): + recipient = recipient[1:] # Remove @ if present + + try: + return await self.client.get_entity(recipient) + except Exception: + logger.error(f"Could not find entity: {recipient}") + return None \ No newline at end of file diff --git a/telegram-bridge/api/models.py b/telegram-bridge/api/models.py new file mode 100644 index 0000000..50e565d --- /dev/null +++ b/telegram-bridge/api/models.py @@ -0,0 +1,49 @@ +""" +API models for data transfer. + +Defines Pydantic models for request and response data validation. +""" + +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, Field + + +class ChatModel(BaseModel): + """Model representing a Telegram chat.""" + id: int + title: str + username: Optional[str] = None + type: str + last_message_time: Optional[datetime] = None + + +class MessageModel(BaseModel): + """Model representing a Telegram message.""" + id: int + chat_id: int + chat_title: str + sender_id: int + sender_name: str + content: str + timestamp: datetime + is_from_me: bool = False + + +class MessageContextModel(BaseModel): + """Model representing a message with its context.""" + message: MessageModel + before: List[MessageModel] = [] + after: List[MessageModel] = [] + + +class SendMessageRequest(BaseModel): + """Model for sending message requests.""" + recipient: str + message: str + + +class SendMessageResponse(BaseModel): + """Model for sending message responses.""" + success: bool + message: str \ No newline at end of file From f1299b3d60fe6753dd214456d44ebb9d32672c9f Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:14:14 +0800 Subject: [PATCH 05/10] database for telegram interactions --- telegram-bridge/database/__init__.py | 5 + telegram-bridge/database/base.py | 42 +++++ telegram-bridge/database/models.py | 58 ++++++ telegram-bridge/database/repositories.py | 228 +++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 telegram-bridge/database/__init__.py create mode 100644 telegram-bridge/database/base.py create mode 100644 telegram-bridge/database/models.py create mode 100644 telegram-bridge/database/repositories.py diff --git a/telegram-bridge/database/__init__.py b/telegram-bridge/database/__init__.py new file mode 100644 index 0000000..4e00d85 --- /dev/null +++ b/telegram-bridge/database/__init__.py @@ -0,0 +1,5 @@ +""" +Database module for the Telegram bridge. + +Provides ORM models, connection management, and repositories for data access. +""" \ No newline at end of file diff --git a/telegram-bridge/database/base.py b/telegram-bridge/database/base.py new file mode 100644 index 0000000..e27ef4e --- /dev/null +++ b/telegram-bridge/database/base.py @@ -0,0 +1,42 @@ +""" +Database configuration and session management. + +Provides a SQLAlchemy engine and session factory with connection pooling. +""" + +import os +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.pool import QueuePool + +from database.models import Base + +# Get database path +STORE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "store") +os.makedirs(STORE_DIR, exist_ok=True) +DB_PATH = os.path.join(STORE_DIR, "messages.db") + +# Create engine with connection pooling +engine = create_engine( + f"sqlite:///{DB_PATH}", + connect_args={"check_same_thread": False}, # Needed for SQLite + poolclass=QueuePool, + pool_size=5, + max_overflow=10, + pool_timeout=30, + pool_recycle=3600, +) + +# Create session factory +SessionFactory = sessionmaker(bind=engine) +Session = scoped_session(SessionFactory) + + +def init_db(): + """Initialize the database schema.""" + Base.metadata.create_all(engine) + + +def get_session(): + """Get a database session from the pool.""" + return Session() \ No newline at end of file diff --git a/telegram-bridge/database/models.py b/telegram-bridge/database/models.py new file mode 100644 index 0000000..357599d --- /dev/null +++ b/telegram-bridge/database/models.py @@ -0,0 +1,58 @@ +""" +Database models for the Telegram bridge. + +This module defines SQLAlchemy ORM models for storing Telegram chats and messages. +""" + +from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey, Index, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from datetime import datetime + +Base = declarative_base() + + +class Chat(Base): + """Represents a Telegram chat (direct message, group, channel, etc.).""" + + __tablename__ = "chats" + + id = Column(Integer, primary_key=True) + title = Column(String, nullable=False) + username = Column(String) + type = Column(String, nullable=False) + last_message_time = Column(DateTime) + + # Relationship with messages + messages = relationship("Message", back_populates="chat", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + + +class Message(Base): + """Represents a Telegram message with all its metadata.""" + + __tablename__ = "messages" + + id = Column(Integer, primary_key=True) + chat_id = Column(Integer, ForeignKey("chats.id"), primary_key=True) + sender_id = Column(Integer) + sender_name = Column(String) + content = Column(Text) + timestamp = Column(DateTime, default=datetime.now) + is_from_me = Column(Boolean, default=False) + + # Relationship with chat + chat = relationship("Chat", back_populates="messages") + + # Indexes for improved query performance + __table_args__ = ( + Index("idx_messages_chat_id", "chat_id"), + Index("idx_messages_timestamp", "timestamp"), + Index("idx_messages_content", "content"), + Index("idx_messages_sender_id", "sender_id"), + ) + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/telegram-bridge/database/repositories.py b/telegram-bridge/database/repositories.py new file mode 100644 index 0000000..c2b7e1b --- /dev/null +++ b/telegram-bridge/database/repositories.py @@ -0,0 +1,228 @@ +""" +Repository classes for database operations. + +Provides abstraction for data access operations on Telegram chats and messages. +""" + +from datetime import datetime +from typing import List, Optional, Dict, Any, Tuple +from sqlalchemy import desc, or_, and_ + +from database.base import get_session +from database.models import Chat, Message + + +class ChatRepository: + """Repository for chat operations.""" + + def store_chat( + self, + chat_id: int, + title: str, + username: Optional[str], + chat_type: str, + last_message_time: datetime, + ) -> None: + """Store a chat in the database.""" + session = get_session() + try: + chat = session.query(Chat).filter_by(id=chat_id).first() + + if chat: + # Update existing chat + chat.title = title + chat.username = username + chat.type = chat_type + chat.last_message_time = last_message_time + else: + # Create new chat + chat = Chat( + id=chat_id, + title=title, + username=username, + type=chat_type, + last_message_time=last_message_time + ) + session.add(chat) + + session.commit() + finally: + session.close() + + def get_chats( + self, + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + chat_type: Optional[str] = None, + sort_by: str = "last_message_time" + ) -> List[Chat]: + """Get chats from the database.""" + session = get_session() + try: + # Build query + db_query = session.query(Chat) + + # Apply filters + if query: + db_query = db_query.filter( + or_( + Chat.title.ilike(f"%{query}%"), + Chat.username.ilike(f"%{query}%") + ) + ) + + if chat_type: + db_query = db_query.filter(Chat.type == chat_type) + + # Apply sorting + if sort_by == "last_message_time": + db_query = db_query.order_by(desc(Chat.last_message_time)) + else: + db_query = db_query.order_by(Chat.title) + + # Apply pagination + db_query = db_query.limit(limit).offset(offset) + + return db_query.all() + finally: + session.close() + + def get_chat_by_id(self, chat_id: int) -> Optional[Chat]: + """Get a chat by its ID.""" + session = get_session() + try: + return session.query(Chat).filter_by(id=chat_id).first() + finally: + session.close() + + +class MessageRepository: + """Repository for message operations.""" + + def store_message( + self, + message_id: int, + chat_id: int, + sender_id: int, + sender_name: str, + content: str, + timestamp: datetime, + is_from_me: bool, + ) -> None: + """Store a message in the database.""" + if not content: # Skip empty messages + return + + session = get_session() + try: + message = session.query(Message).filter_by( + id=message_id, chat_id=chat_id + ).first() + + if message: + # Update existing message + message.sender_id = sender_id + message.sender_name = sender_name + message.content = content + message.timestamp = timestamp + message.is_from_me = is_from_me + else: + # Create new message + message = Message( + id=message_id, + chat_id=chat_id, + sender_id=sender_id, + sender_name=sender_name, + content=content, + timestamp=timestamp, + is_from_me=is_from_me + ) + session.add(message) + + session.commit() + finally: + session.close() + + def get_messages( + self, + chat_id: Optional[int] = None, + sender_id: Optional[int] = None, + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + date_range: Optional[Tuple[datetime, datetime]] = None, + ) -> List[Message]: + """Get messages from the database.""" + session = get_session() + try: + # Build query + db_query = session.query(Message).join(Chat) + + # Apply filters + filters = [] + + if chat_id: + filters.append(Message.chat_id == chat_id) + + if sender_id: + filters.append(Message.sender_id == sender_id) + + if query: + filters.append(Message.content.ilike(f"%{query}%")) + + if date_range: + start_date, end_date = date_range + filters.append(and_( + Message.timestamp >= start_date, + Message.timestamp <= end_date + )) + + if filters: + db_query = db_query.filter(and_(*filters)) + + # Apply sorting and pagination + db_query = db_query.order_by(desc(Message.timestamp)) + db_query = db_query.limit(limit).offset(offset) + + return db_query.all() + finally: + session.close() + + def get_message_context( + self, + message_id: int, + chat_id: int, + before: int = 5, + after: int = 5 + ) -> Dict[str, Any]: + """Get context around a specific message.""" + session = get_session() + try: + # Get the target message + target_message = session.query(Message).filter_by( + id=message_id, chat_id=chat_id + ).first() + + if not target_message: + raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found") + + # Get messages before + before_messages = session.query(Message).filter( + Message.chat_id == chat_id, + Message.timestamp < target_message.timestamp + ).order_by(desc(Message.timestamp)).limit(before).all() + + # Get messages after + after_messages = session.query(Message).filter( + Message.chat_id == chat_id, + Message.timestamp > target_message.timestamp + ).order_by(Message.timestamp).limit(after).all() + + return { + "message": target_message, + "before": before_messages, + "after": after_messages + } + finally: + session.close() \ No newline at end of file From 0dd5719169632837b76f2f8101e0f64be16934da Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:14:26 +0800 Subject: [PATCH 06/10] server for telegarm interactions --- telegram-bridge/server/__init__.py | 7 ++ telegram-bridge/server/app.py | 170 +++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 telegram-bridge/server/__init__.py create mode 100644 telegram-bridge/server/app.py diff --git a/telegram-bridge/server/__init__.py b/telegram-bridge/server/__init__.py new file mode 100644 index 0000000..cab5aa1 --- /dev/null +++ b/telegram-bridge/server/__init__.py @@ -0,0 +1,7 @@ +""" +Server module for the Telegram bridge. + +Provides FastAPI application for HTTP endpoints. +""" + +from server.app import app, get_telegram_service \ No newline at end of file diff --git a/telegram-bridge/server/app.py b/telegram-bridge/server/app.py new file mode 100644 index 0000000..d42664a --- /dev/null +++ b/telegram-bridge/server/app.py @@ -0,0 +1,170 @@ +""" +FastAPI application for the Telegram bridge. + +Provides HTTP API endpoints for interacting with Telegram. +""" + +from fastapi import FastAPI, HTTPException, Depends +from typing import List, Optional + +from api.models import ( + ChatModel, + MessageModel, + MessageContextModel, + SendMessageRequest, + SendMessageResponse +) +from service import TelegramService + +# Create FastAPI app +app = FastAPI( + title="Telegram Bridge API", + description="API for interacting with Telegram", + version="1.0.0" +) + +# Service dependency +def get_telegram_service() -> TelegramService: + """Get the Telegram service instance. + + This function should be replaced with a proper dependency injection + mechanism to return the singleton instance of the TelegramService. + """ + # This is a placeholder. In main.py, we'll set this to the actual service instance + raise NotImplementedError("Telegram service not initialized") + + +@app.get("/api/chats", response_model=List[ChatModel]) +async def list_chats( + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + chat_type: Optional[str] = None, + sort_by: str = "last_message_time", + service: TelegramService = Depends(get_telegram_service) +): + """List chats.""" + chats = service.chat_repo.get_chats( + query=query, + limit=limit, + offset=offset, + chat_type=chat_type, + sort_by=sort_by + ) + return [ + ChatModel( + id=chat.id, + title=chat.title, + username=chat.username, + type=chat.type, + last_message_time=chat.last_message_time + ) for chat in chats + ] + + +@app.get("/api/messages", response_model=List[MessageModel]) +async def list_messages( + chat_id: Optional[int] = None, + sender_id: Optional[int] = None, + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + service: TelegramService = Depends(get_telegram_service) +): + """List messages.""" + messages = service.message_repo.get_messages( + chat_id=chat_id, + sender_id=sender_id, + query=query, + limit=limit, + offset=offset + ) + return [ + MessageModel( + id=msg.id, + chat_id=msg.chat_id, + chat_title=msg.chat.title, + sender_id=msg.sender_id, + sender_name=msg.sender_name, + content=msg.content, + timestamp=msg.timestamp, + is_from_me=msg.is_from_me + ) for msg in messages + ] + + +@app.get("/api/messages/{chat_id}/{message_id}/context", response_model=MessageContextModel) +async def get_message_context( + chat_id: int, + message_id: int, + before: int = 5, + after: int = 5, + service: TelegramService = Depends(get_telegram_service) +): + """Get context around a message.""" + try: + context = service.message_repo.get_message_context( + message_id=message_id, + chat_id=chat_id, + before=before, + after=after + ) + + # Convert to model + target_message = MessageModel( + id=context["message"].id, + chat_id=context["message"].chat_id, + chat_title=context["message"].chat.title, + sender_id=context["message"].sender_id, + sender_name=context["message"].sender_name, + content=context["message"].content, + timestamp=context["message"].timestamp, + is_from_me=context["message"].is_from_me + ) + + before_messages = [ + MessageModel( + id=msg.id, + chat_id=msg.chat_id, + chat_title=msg.chat.title, + sender_id=msg.sender_id, + sender_name=msg.sender_name, + content=msg.content, + timestamp=msg.timestamp, + is_from_me=msg.is_from_me + ) for msg in context["before"] + ] + + after_messages = [ + MessageModel( + id=msg.id, + chat_id=msg.chat_id, + chat_title=msg.chat.title, + sender_id=msg.sender_id, + sender_name=msg.sender_name, + content=msg.content, + timestamp=msg.timestamp, + is_from_me=msg.is_from_me + ) for msg in context["after"] + ] + + return MessageContextModel( + message=target_message, + before=before_messages, + after=after_messages + ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +@app.post("/api/send", response_model=SendMessageResponse) +async def send_message( + request: SendMessageRequest, + service: TelegramService = Depends(get_telegram_service) +): + """Send a message to a Telegram recipient.""" + success, message = await service.send_message( + recipient=request.recipient, + message=request.message + ) + return SendMessageResponse(success=success, message=message) \ No newline at end of file From 5bc8ff4c70eb0518d18e799ba81771cf8f4b01dc Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:28:16 +0800 Subject: [PATCH 07/10] fix imports --- telegram-bridge/__init__.py | 6 +++++- telegram-bridge/api/__init__.py | 12 +++++++++++- telegram-bridge/database/__init__.py | 6 +++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/telegram-bridge/__init__.py b/telegram-bridge/__init__.py index b4cbc5f..ff69fc0 100644 --- a/telegram-bridge/__init__.py +++ b/telegram-bridge/__init__.py @@ -2,4 +2,8 @@ Telegram Bridge. Provides a bridge between Telegram API and HTTP API for sending and receiving messages. -""" \ No newline at end of file +""" + +from api import TelegramApiClient +from database import init_db + diff --git a/telegram-bridge/api/__init__.py b/telegram-bridge/api/__init__.py index 0bb9df1..a8698c5 100644 --- a/telegram-bridge/api/__init__.py +++ b/telegram-bridge/api/__init__.py @@ -2,4 +2,14 @@ API module for Telegram interaction. Provides client, middleware, and models for interacting with the Telegram API. -""" \ No newline at end of file +""" + +from api.client import TelegramApiClient +from api.middleware import TelegramMiddleware, handle_telegram_errors +from api.models import ( + ChatModel, + MessageModel, + MessageContextModel, + SendMessageRequest, + SendMessageResponse +) diff --git a/telegram-bridge/database/__init__.py b/telegram-bridge/database/__init__.py index 4e00d85..81a1eb2 100644 --- a/telegram-bridge/database/__init__.py +++ b/telegram-bridge/database/__init__.py @@ -2,4 +2,8 @@ Database module for the Telegram bridge. Provides ORM models, connection management, and repositories for data access. -""" \ No newline at end of file +""" + +from database.base import init_db, get_session +from database.models import Chat, Message +from database.repositories import ChatRepository, MessageRepository From 094ed9b4bebae8fe11063fcb347963ae7b497024 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 17:01:21 +0800 Subject: [PATCH 08/10] refactor telegram for mcp --- telegram-mcp-server/telegram/__init__.py | 29 +++ telegram-mcp-server/telegram/api.py | 47 +++++ .../{telegram.py => telegram/database.py} | 185 +----------------- telegram-mcp-server/telegram/models.py | 77 ++++++++ 4 files changed, 158 insertions(+), 180 deletions(-) create mode 100644 telegram-mcp-server/telegram/__init__.py create mode 100644 telegram-mcp-server/telegram/api.py rename telegram-mcp-server/{telegram.py => telegram/database.py} (69%) create mode 100644 telegram-mcp-server/telegram/models.py diff --git a/telegram-mcp-server/telegram/__init__.py b/telegram-mcp-server/telegram/__init__.py new file mode 100644 index 0000000..a8d8c64 --- /dev/null +++ b/telegram-mcp-server/telegram/__init__.py @@ -0,0 +1,29 @@ +""" +This module implements data models and functions for retrieving and sending +Telegram messages, managing chats, and working with contacts. + +The module connects to a SQLite database that stores all Telegram messages and chat data, +which is maintained by the Telegram Bridge. It also provides an HTTP client for sending +messages via the Bridge's API endpoint. + +Main features: +- Data models for messages, chats, contacts, and message context +- Database access functions for retrieving messages, chats, and contacts +- HTTP client for sending messages through the Telegram Bridge +- Helper functions for displaying formatted messages and chats +""" + +import os.path + +# Database path +MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '..', 'telegram-bridge', 'store', 'messages.db') +TELEGRAM_API_BASE_URL = "http://localhost:8081/api" + +# Import all components to make them available at the module level +from .models import Message, Chat, Contact, MessageContext +from .display import print_message, print_messages_list, print_chat, print_chats_list +from .api import send_message +from .database import ( + search_contacts, list_messages, get_message_context, list_chats, + get_chat, get_direct_chat_by_contact, get_contact_chats, get_last_interaction +) \ No newline at end of file diff --git a/telegram-mcp-server/telegram/api.py b/telegram-mcp-server/telegram/api.py new file mode 100644 index 0000000..2f8c039 --- /dev/null +++ b/telegram-mcp-server/telegram/api.py @@ -0,0 +1,47 @@ +""" +API client for interacting with the Telegram Bridge API. +""" + +import requests +import json +from typing import Tuple + +from . import TELEGRAM_API_BASE_URL + +def send_message(recipient: str, message: str) -> Tuple[bool, str]: + """Send a Telegram message to the specified recipient. + + Args: + recipient: The recipient - either a username (with or without @), + or a chat ID as a string or integer + message: The message text to send + + Returns: + Tuple[bool, str]: A tuple containing success status and a status message + """ + try: + # Validate input + if not recipient: + return False, "Recipient must be provided" + + url = f"{TELEGRAM_API_BASE_URL}/send" + payload = { + "recipient": recipient, + "message": message + } + + response = requests.post(url, json=payload) + + # Check if the request was successful + if response.status_code == 200: + result = response.json() + return result.get("success", False), result.get("message", "Unknown response") + else: + return False, f"Error: HTTP {response.status_code} - {response.text}" + + except requests.RequestException as e: + return False, f"Request error: {str(e)}" + except json.JSONDecodeError: + return False, f"Error parsing response: Unknown" + except Exception as e: + return False, f"Unexpected error: {str(e)}" \ No newline at end of file diff --git a/telegram-mcp-server/telegram.py b/telegram-mcp-server/telegram/database.py similarity index 69% rename from telegram-mcp-server/telegram.py rename to telegram-mcp-server/telegram/database.py index fe1c649..2a1e6bb 100644 --- a/telegram-mcp-server/telegram.py +++ b/telegram-mcp-server/telegram/database.py @@ -1,150 +1,13 @@ """ -This module implements data models and functions for retrieving and sending -Telegram messages, managing chats, and working with contacts. - -The module connects to a SQLite database that stores all Telegram messages and chat data, -which is maintained by the Telegram Bridge. It also provides an HTTP client for sending -messages via the Bridge's API endpoint. - -Main features: -- Data models for messages, chats, contacts, and message context -- Database access functions for retrieving messages, chats, and contacts -- HTTP client for sending messages through the Telegram Bridge -- Helper functions for displaying formatted messages and chats - -All database operations use parameterized queries to prevent SQL injection. +Database operations for retrieving and managing Telegram data. """ import sqlite3 -import requests -import json -import os.path from datetime import datetime -from dataclasses import dataclass -from typing import Optional, List, Tuple, Dict, Any +from typing import Optional, List, Tuple -# Database path -MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'telegram-bridge', 'store', 'messages.db') -TELEGRAM_API_BASE_URL = "http://localhost:8081/api" - -@dataclass -class Message: - """ - Represents a Telegram message with all its metadata. - - Attributes: - id: Unique message identifier - chat_id: ID of the chat the message belongs to - chat_title: Title of the chat (user name, group name, etc.) - sender_name: Name of the message sender - content: Text content of the message - timestamp: Date and time when the message was sent - is_from_me: Boolean indicating if the message was sent by the user - sender_id: ID of the message sender - """ - id: int - chat_id: int - chat_title: str - sender_name: str - content: str - timestamp: datetime - is_from_me: bool - sender_id: int - -@dataclass -class Chat: - """ - Represents a Telegram chat (direct message, group, channel, etc.). - - Attributes: - id: Unique chat identifier - title: Name of the chat (user name, group name, etc.) - username: Optional Telegram username (without @) - type: Type of chat ('user', 'group', 'channel', 'supergroup') - last_message_time: Timestamp of the most recent message in the chat - """ - id: int - title: str - username: Optional[str] - type: str - last_message_time: Optional[datetime] - -@dataclass -class Contact: - """ - Represents a Telegram contact. - - Attributes: - id: Unique contact identifier - username: Optional Telegram username (without @) - name: Display name of the contact - """ - id: int - username: Optional[str] - name: str - -@dataclass -class MessageContext: - """ - Provides context around a specific message. - - Attributes: - message: The target message - before: List of messages that came before the target message - after: List of messages that came after the target message - """ - message: Message - before: List[Message] - after: List[Message] - -def print_message(message: Message, show_chat_info: bool = True) -> None: - """Print a single message with consistent formatting.""" - direction = "→" if message.is_from_me else "←" - - if show_chat_info: - print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})") - else: - print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}") - - print(f"From: {'Me' if message.is_from_me else message.sender_name}") - print(f"Message: {message.content}") - print("-" * 100) - -def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None: - """Print a list of messages with a title and consistent formatting.""" - if not messages: - print("No messages to display.") - return - - if title: - print(f"\n{title}") - print("-" * 100) - - for message in messages: - print_message(message, show_chat_info) - -def print_chat(chat: Chat) -> None: - """Print a single chat with consistent formatting.""" - print(f"Chat: {chat.title} (ID: {chat.id})") - print(f"Type: {chat.type}") - if chat.username: - print(f"Username: @{chat.username}") - if chat.last_message_time: - print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}") - print("-" * 100) - -def print_chats_list(chats: List[Chat], title: str = "") -> None: - """Print a list of chats with a title and consistent formatting.""" - if not chats: - print("No chats to display.") - return - - if title: - print(f"\n{title}") - print("-" * 100) - - for chat in chats: - print_chat(chat) +from . import MESSAGES_DB_PATH +from .models import Message, Chat, Contact, MessageContext def search_contacts(query: str) -> List[Contact]: """Search contacts by name or username.""" @@ -579,42 +442,4 @@ def get_last_interaction(contact_id: int) -> Optional[Message]: return None finally: if 'conn' in locals(): - conn.close() - -def send_message(recipient: str, message: str) -> Tuple[bool, str]: - """Send a Telegram message to the specified recipient. - - Args: - recipient: The recipient - either a username (with or without @), - or a chat ID as a string or integer - message: The message text to send - - Returns: - Tuple[bool, str]: A tuple containing success status and a status message - """ - try: - # Validate input - if not recipient: - return False, "Recipient must be provided" - - url = f"{TELEGRAM_API_BASE_URL}/send" - payload = { - "recipient": recipient, - "message": message - } - - response = requests.post(url, json=payload) - - # Check if the request was successful - if response.status_code == 200: - result = response.json() - return result.get("success", False), result.get("message", "Unknown response") - else: - return False, f"Error: HTTP {response.status_code} - {response.text}" - - except requests.RequestException as e: - return False, f"Request error: {str(e)}" - except json.JSONDecodeError: - return False, f"Error parsing response: Unknown" - except Exception as e: - return False, f"Unexpected error: {str(e)}" \ No newline at end of file + conn.close() \ No newline at end of file diff --git a/telegram-mcp-server/telegram/models.py b/telegram-mcp-server/telegram/models.py new file mode 100644 index 0000000..ddee0e6 --- /dev/null +++ b/telegram-mcp-server/telegram/models.py @@ -0,0 +1,77 @@ +""" +Data models for Telegram entities. +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, List + +@dataclass +class Message: + """ + Represents a Telegram message with all its metadata. + + Attributes: + id: Unique message identifier + chat_id: ID of the chat the message belongs to + chat_title: Title of the chat (user name, group name, etc.) + sender_name: Name of the message sender + content: Text content of the message + timestamp: Date and time when the message was sent + is_from_me: Boolean indicating if the message was sent by the user + sender_id: ID of the message sender + """ + id: int + chat_id: int + chat_title: str + sender_name: str + content: str + timestamp: datetime + is_from_me: bool + sender_id: int + +@dataclass +class Chat: + """ + Represents a Telegram chat (direct message, group, channel, etc.). + + Attributes: + id: Unique chat identifier + title: Name of the chat (user name, group name, etc.) + username: Optional Telegram username (without @) + type: Type of chat ('user', 'group', 'channel', 'supergroup') + last_message_time: Timestamp of the most recent message in the chat + """ + id: int + title: str + username: Optional[str] + type: str + last_message_time: Optional[datetime] + +@dataclass +class Contact: + """ + Represents a Telegram contact. + + Attributes: + id: Unique contact identifier + username: Optional Telegram username (without @) + name: Display name of the contact + """ + id: int + username: Optional[str] + name: str + +@dataclass +class MessageContext: + """ + Provides context around a specific message. + + Attributes: + message: The target message + before: List of messages that came before the target message + after: List of messages that came after the target message + """ + message: Message + before: List[Message] + after: List[Message] \ No newline at end of file From e12dca71a96db6817ff7ee9f52835805582fdc0f Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 17:01:32 +0800 Subject: [PATCH 09/10] dipslay functions --- telegram-mcp-server/telegram/display.py | 55 +++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 telegram-mcp-server/telegram/display.py diff --git a/telegram-mcp-server/telegram/display.py b/telegram-mcp-server/telegram/display.py new file mode 100644 index 0000000..80e3e06 --- /dev/null +++ b/telegram-mcp-server/telegram/display.py @@ -0,0 +1,55 @@ +""" +Functions for displaying Telegram messages and chats in a formatted way. +""" + +from typing import List +from .models import Message, Chat + +def print_message(message: Message, show_chat_info: bool = True) -> None: + """Print a single message with consistent formatting.""" + direction = "→" if message.is_from_me else "←" + + if show_chat_info: + print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})") + else: + print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}") + + print(f"From: {'Me' if message.is_from_me else message.sender_name}") + print(f"Message: {message.content}") + print("-" * 100) + +def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None: + """Print a list of messages with a title and consistent formatting.""" + if not messages: + print("No messages to display.") + return + + if title: + print(f"\n{title}") + print("-" * 100) + + for message in messages: + print_message(message, show_chat_info) + +def print_chat(chat: Chat) -> None: + """Print a single chat with consistent formatting.""" + print(f"Chat: {chat.title} (ID: {chat.id})") + print(f"Type: {chat.type}") + if chat.username: + print(f"Username: @{chat.username}") + if chat.last_message_time: + print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}") + print("-" * 100) + +def print_chats_list(chats: List[Chat], title: str = "") -> None: + """Print a list of chats with a title and consistent formatting.""" + if not chats: + print("No chats to display.") + return + + if title: + print(f"\n{title}") + print("-" * 100) + + for chat in chats: + print_chat(chat) \ No newline at end of file From 13b670f7127e6b75e70f64f740b7a4f193f6f040 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 17:17:08 +0800 Subject: [PATCH 10/10] use sqlalchemy to query db without plain sql --- telegram-mcp-server/telegram/database.py | 435 +++++++++++------------ 1 file changed, 202 insertions(+), 233 deletions(-) diff --git a/telegram-mcp-server/telegram/database.py b/telegram-mcp-server/telegram/database.py index 2a1e6bb..d5f8e84 100644 --- a/telegram-mcp-server/telegram/database.py +++ b/telegram-mcp-server/telegram/database.py @@ -1,48 +1,65 @@ """ Database operations for retrieving and managing Telegram data. +Uses SQLAlchemy ORM for database access instead of raw SQL. """ -import sqlite3 +from sqlalchemy import create_engine, or_, and_, desc +from sqlalchemy.orm import sessionmaker, scoped_session from datetime import datetime from typing import Optional, List, Tuple from . import MESSAGES_DB_PATH from .models import Message, Chat, Contact, MessageContext +# Initialize SQLAlchemy engine and session +engine = create_engine( + f"sqlite:///{MESSAGES_DB_PATH}", + connect_args={"check_same_thread": False} # Needed for SQLite +) +SessionFactory = sessionmaker(bind=engine) +Session = scoped_session(SessionFactory) + +# Import SQLAlchemy models from telegram-bridge +import sys +import os + +# Add the parent directory to path to find telegram-bridge +bridge_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'telegram-bridge') +sys.path.append(bridge_path) + +# Import models directly from telegram-bridge +from database.models import Chat as DBChat, Message as DBMessage + def search_contacts(query: str) -> List[Contact]: """Search contacts by name or username.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() # Search in chats where type is 'user' - cursor.execute(""" - SELECT id, username, title - FROM chats - WHERE type = 'user' AND (title LIKE ? OR username LIKE ?) - ORDER BY title - LIMIT 50 - """, (f"%{query}%", f"%{query}%")) - - contacts = cursor.fetchall() + db_contacts = session.query(DBChat).filter( + DBChat.type == 'user', + or_( + DBChat.title.ilike(f"%{query}%"), + DBChat.username.ilike(f"%{query}%") + ) + ).order_by(DBChat.title).limit(50).all() result = [] - for contact_data in contacts: + for db_contact in db_contacts: contact = Contact( - id=contact_data[0], - username=contact_data[1], - name=contact_data[2] + id=db_contact.id, + username=db_contact.username, + name=db_contact.title ) result.append(contact) return result - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return [] finally: - if 'conn' in locals(): - conn.close() + session.close() def list_messages( date_range: Optional[Tuple[datetime, datetime]] = None, @@ -57,66 +74,50 @@ def list_messages( ) -> List[Message]: """Get messages matching the specified criteria with optional context.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() - # Build base query - query_parts = [""" - SELECT - m.id, - m.chat_id, - c.title, - m.sender_name, - m.content, - m.timestamp, - m.is_from_me, - m.sender_id - FROM messages m - """] - query_parts.append("JOIN chats c ON m.chat_id = c.id") - where_clauses = [] - params = [] + # Build base query with join + db_query = session.query(DBMessage, DBChat).join(DBChat) # Add filters + filters = [] if date_range: - where_clauses.append("m.timestamp BETWEEN ? AND ?") - params.extend([date_range[0].isoformat(), date_range[1].isoformat()]) + filters.append(and_( + DBMessage.timestamp >= date_range[0], + DBMessage.timestamp <= date_range[1] + )) if sender_id: - where_clauses.append("m.sender_id = ?") - params.append(sender_id) + filters.append(DBMessage.sender_id == sender_id) if chat_id: - where_clauses.append("m.chat_id = ?") - params.append(chat_id) + filters.append(DBMessage.chat_id == chat_id) if query: - where_clauses.append("LOWER(m.content) LIKE LOWER(?)") - params.append(f"%{query}%") + filters.append(DBMessage.content.ilike(f"%{query}%")) - if where_clauses: - query_parts.append("WHERE " + " AND ".join(where_clauses)) + if filters: + db_query = db_query.filter(and_(*filters)) # Add pagination offset = page * limit - query_parts.append("ORDER BY m.timestamp DESC") - query_parts.append("LIMIT ? OFFSET ?") - params.extend([limit, offset]) + db_query = db_query.order_by(desc(DBMessage.timestamp)) + db_query = db_query.limit(limit).offset(offset) - cursor.execute(" ".join(query_parts), tuple(params)) - messages = cursor.fetchall() + # Execute query + db_results = db_query.all() result = [] - for msg in messages: + for db_msg, db_chat in db_results: message = Message( - id=msg[0], - chat_id=msg[1], - chat_title=msg[2], - sender_name=msg[3], - content=msg[4], - timestamp=datetime.fromisoformat(msg[5]), - is_from_me=bool(msg[6]), - sender_id=msg[7] + id=db_msg.id, + chat_id=db_msg.chat_id, + chat_title=db_chat.title, + sender_name=db_msg.sender_name, + content=db_msg.content, + timestamp=db_msg.timestamp, + is_from_me=db_msg.is_from_me, + sender_id=db_msg.sender_id ) result.append(message) @@ -132,12 +133,11 @@ def list_messages( return result - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return [] finally: - if 'conn' in locals(): - conn.close() + session.close() def get_message_context( message_id: int, @@ -147,76 +147,75 @@ def get_message_context( ) -> MessageContext: """Get context around a specific message.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() # Get the target message first - cursor.execute(""" - SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id - FROM messages m - JOIN chats c ON m.chat_id = c.id - WHERE m.id = ? AND m.chat_id = ? - """, (message_id, chat_id)) - msg_data = cursor.fetchone() + result = session.query(DBMessage, DBChat) \ + .join(DBChat) \ + .filter(DBMessage.id == message_id, DBMessage.chat_id == chat_id) \ + .first() - if not msg_data: + if not result: raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found") + db_msg, db_chat = result target_message = Message( - id=msg_data[0], - chat_id=msg_data[1], - chat_title=msg_data[2], - sender_name=msg_data[3], - content=msg_data[4], - timestamp=datetime.fromisoformat(msg_data[5]), - is_from_me=bool(msg_data[6]), - sender_id=msg_data[7] + id=db_msg.id, + chat_id=db_msg.chat_id, + chat_title=db_chat.title, + sender_name=db_msg.sender_name, + content=db_msg.content, + timestamp=db_msg.timestamp, + is_from_me=db_msg.is_from_me, + sender_id=db_msg.sender_id ) # Get messages before - cursor.execute(""" - SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id - FROM messages m - JOIN chats c ON m.chat_id = c.id - WHERE m.chat_id = ? AND m.timestamp < ? - ORDER BY m.timestamp DESC - LIMIT ? - """, (chat_id, target_message.timestamp.isoformat(), before)) + before_results = session.query(DBMessage, DBChat) \ + .join(DBChat) \ + .filter( + DBMessage.chat_id == chat_id, + DBMessage.timestamp < target_message.timestamp + ) \ + .order_by(desc(DBMessage.timestamp)) \ + .limit(before) \ + .all() before_messages = [] - for msg in cursor.fetchall(): + for db_msg, db_chat in before_results: before_messages.append(Message( - id=msg[0], - chat_id=msg[1], - chat_title=msg[2], - sender_name=msg[3], - content=msg[4], - timestamp=datetime.fromisoformat(msg[5]), - is_from_me=bool(msg[6]), - sender_id=msg[7] + id=db_msg.id, + chat_id=db_msg.chat_id, + chat_title=db_chat.title, + sender_name=db_msg.sender_name, + content=db_msg.content, + timestamp=db_msg.timestamp, + is_from_me=db_msg.is_from_me, + sender_id=db_msg.sender_id )) # Get messages after - cursor.execute(""" - SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id - FROM messages m - JOIN chats c ON m.chat_id = c.id - WHERE m.chat_id = ? AND m.timestamp > ? - ORDER BY m.timestamp ASC - LIMIT ? - """, (chat_id, target_message.timestamp.isoformat(), after)) + after_results = session.query(DBMessage, DBChat) \ + .join(DBChat) \ + .filter( + DBMessage.chat_id == chat_id, + DBMessage.timestamp > target_message.timestamp + ) \ + .order_by(DBMessage.timestamp) \ + .limit(after) \ + .all() after_messages = [] - for msg in cursor.fetchall(): + for db_msg, db_chat in after_results: after_messages.append(Message( - id=msg[0], - chat_id=msg[1], - chat_title=msg[2], - sender_name=msg[3], - content=msg[4], - timestamp=datetime.fromisoformat(msg[5]), - is_from_me=bool(msg[6]), - sender_id=msg[7] + id=db_msg.id, + chat_id=db_msg.chat_id, + chat_title=db_chat.title, + sender_name=db_msg.sender_name, + content=db_msg.content, + timestamp=db_msg.timestamp, + is_from_me=db_msg.is_from_me, + sender_id=db_msg.sender_id )) return MessageContext( @@ -225,12 +224,11 @@ def get_message_context( after=after_messages ) - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") raise finally: - if 'conn' in locals(): - conn.close() + session.close() def list_chats( query: Optional[str] = None, @@ -241,124 +239,108 @@ def list_chats( ) -> List[Chat]: """Get chats matching the specified criteria.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() # Build base query - query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"] + db_query = session.query(DBChat) - where_clauses = [] - params = [] + # Add filters + filters = [] if query: - where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(username) LIKE LOWER(?))") - params.extend([f"%{query}%", f"%{query}%"]) + filters.append(or_( + DBChat.title.ilike(f"%{query}%"), + DBChat.username.ilike(f"%{query}%") + )) if chat_type: - where_clauses.append("type = ?") - params.append(chat_type) + filters.append(DBChat.type == chat_type) - if where_clauses: - query_parts.append("WHERE " + " AND ".join(where_clauses)) + if filters: + db_query = db_query.filter(and_(*filters)) # Add sorting - order_by = "last_message_time DESC" if sort_by == "last_active" else "title" - query_parts.append(f"ORDER BY {order_by}") + if sort_by == "last_active": + db_query = db_query.order_by(desc(DBChat.last_message_time)) + else: + db_query = db_query.order_by(DBChat.title) # Add pagination - offset = (page) * limit - query_parts.append("LIMIT ? OFFSET ?") - params.extend([limit, offset]) + offset = page * limit + db_query = db_query.limit(limit).offset(offset) - cursor.execute(" ".join(query_parts), tuple(params)) - chats = cursor.fetchall() + # Execute query + db_chats = db_query.all() result = [] - for chat_data in chats: - last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + for db_chat in db_chats: chat = Chat( - id=chat_data[0], - title=chat_data[1], - username=chat_data[2], - type=chat_data[3], - last_message_time=last_message_time + id=db_chat.id, + title=db_chat.title, + username=db_chat.username, + type=db_chat.type, + last_message_time=db_chat.last_message_time ) result.append(chat) return result - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return [] finally: - if 'conn' in locals(): - conn.close() + session.close() def get_chat(chat_id: int) -> Optional[Chat]: """Get chat metadata by ID.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() - cursor.execute(""" - SELECT id, title, username, type, last_message_time - FROM chats - WHERE id = ? - """, (chat_id,)) + db_chat = session.query(DBChat).filter(DBChat.id == chat_id).first() - chat_data = cursor.fetchone() - - if not chat_data: + if not db_chat: return None - last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None return Chat( - id=chat_data[0], - title=chat_data[1], - username=chat_data[2], - type=chat_data[3], - last_message_time=last_message_time + id=db_chat.id, + title=db_chat.title, + username=db_chat.username, + type=db_chat.type, + last_message_time=db_chat.last_message_time ) - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return None finally: - if 'conn' in locals(): - conn.close() + session.close() def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]: """Get direct chat metadata by contact ID.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() - cursor.execute(""" - SELECT id, title, username, type, last_message_time - FROM chats - WHERE id = ? AND type = 'user' - """, (contact_id,)) + db_chat = session.query(DBChat).filter( + DBChat.id == contact_id, + DBChat.type == 'user' + ).first() - chat_data = cursor.fetchone() - - if not chat_data: + if not db_chat: return None - last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None return Chat( - id=chat_data[0], - title=chat_data[1], - username=chat_data[2], - type=chat_data[3], - last_message_time=last_message_time + id=db_chat.id, + title=db_chat.title, + username=db_chat.username, + type=db_chat.type, + last_message_time=db_chat.last_message_time ) - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return None finally: - if 'conn' in locals(): - conn.close() + session.close() def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]: """Get all chats involving the contact. @@ -369,77 +351,64 @@ def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[C page: Page number for pagination (default 0) """ try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() - cursor.execute(""" - SELECT DISTINCT - c.id, c.title, c.username, c.type, c.last_message_time - FROM chats c - JOIN messages m ON c.id = m.chat_id - WHERE m.sender_id = ? OR c.id = ? - ORDER BY c.last_message_time DESC - LIMIT ? OFFSET ? - """, (contact_id, contact_id, limit, page * limit)) - - chats = cursor.fetchall() + # Using a subquery to get distinct chats for the contact + db_chats = session.query(DBChat).join(DBMessage, DBChat.id == DBMessage.chat_id).filter( + or_( + DBMessage.sender_id == contact_id, + DBChat.id == contact_id + ) + ).distinct().order_by(desc(DBChat.last_message_time)).limit(limit).offset(page * limit).all() result = [] - for chat_data in chats: - last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + for db_chat in db_chats: chat = Chat( - id=chat_data[0], - title=chat_data[1], - username=chat_data[2], - type=chat_data[3], - last_message_time=last_message_time + id=db_chat.id, + title=db_chat.title, + username=db_chat.username, + type=db_chat.type, + last_message_time=db_chat.last_message_time ) result.append(chat) return result - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return [] finally: - if 'conn' in locals(): - conn.close() + session.close() def get_last_interaction(contact_id: int) -> Optional[Message]: """Get most recent message involving the contact.""" try: - conn = sqlite3.connect(MESSAGES_DB_PATH) - cursor = conn.cursor() + session = Session() - cursor.execute(""" - SELECT - m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id - FROM messages m - JOIN chats c ON m.chat_id = c.id - WHERE m.sender_id = ? OR c.id = ? - ORDER BY m.timestamp DESC - LIMIT 1 - """, (contact_id, contact_id)) + result = session.query(DBMessage, DBChat).join(DBChat).filter( + or_( + DBMessage.sender_id == contact_id, + DBChat.id == contact_id + ) + ).order_by(desc(DBMessage.timestamp)).first() - msg_data = cursor.fetchone() - - if not msg_data: + if not result: return None + db_msg, db_chat = result return Message( - id=msg_data[0], - chat_id=msg_data[1], - chat_title=msg_data[2], - sender_name=msg_data[3], - content=msg_data[4], - timestamp=datetime.fromisoformat(msg_data[5]), - is_from_me=bool(msg_data[6]), - sender_id=msg_data[7] + id=db_msg.id, + chat_id=db_msg.chat_id, + chat_title=db_chat.title, + sender_name=db_msg.sender_name, + content=db_msg.content, + timestamp=db_msg.timestamp, + is_from_me=db_msg.is_from_me, + sender_id=db_msg.sender_id ) - except sqlite3.Error as e: + except Exception as e: print(f"Database error: {e}") return None finally: - if 'conn' in locals(): - conn.close() \ No newline at end of file + session.close() \ No newline at end of file