commit f6a7d983e57420b7970a49d62247eca8bad747d7 Author: Muhammad18557 Date: Fri Apr 4 15:42:47 2025 +0800 Initial commit: Telegram MCP server and bridge diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a874a15 --- /dev/null +++ b/.gitignore @@ -0,0 +1,58 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +myenv/ +venv/ +ENV/ +env/ +.env + +# Telegram session files +telegram-bridge/store/*.session +telegram-bridge/store/*.session-journal + +# Database files with messages +telegram-bridge/store/*.db +telegram-bridge/store/*.db-shm +telegram-bridge/store/*.db-wal + +# Logs +logs/ +*.log + +# IDE files +.idea/ +.vscode/ +*.swp +*.swo +.DS_Store + +# Sensitive information +telegram-bridge/.env +*.key +*.pem +*.crt + +# User-specific files +claude_desktop_config.json +mcp.json \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4e3a978 --- /dev/null +++ b/README.md @@ -0,0 +1,177 @@ +# Telegram MCP Server + +This is a Model Context Protocol (MCP) server for Telegram. + +With this you can search your personal Telegram messages, search your contacts, and send messages to either individuals or groups. + +It connects to your **personal Telegram account** directly via the Telegram API (using the [Telethon](https://github.com/LonamiWebs/Telethon) library). All your messages are stored locally in a SQLite database and only sent to an LLM (such as Claude) when the agent accesses them through tools (which you control). + +## Installation + +### Prerequisites + +- Python 3.6+ +- Anthropic Claude Desktop app (or Cursor) + +### Steps + +1. **Clone this repository** + + ```bash + git clone https://github.com/yourusername/telegram-mcp.git + cd telegram-mcp + ``` + Create a Python virtual environment and activate it: + ```bash + python3 -m venv myenv + source myenv/bin/activate + ``` + Install dependencies: + ```bash + pip install -r requirements.txt + ``` + + This is the environment that can be used both for running the **Telegram bridge** and the **MCP server**. + + +2. **Get your Telegram API credentials** + + - Go to https://my.telegram.org/auth + - Log in and go to "API development tools" + - Create a new application + - Note your API ID and API hash + +3. **Run the Telegram bridge** + + Navigate to the telegram-bridge directory: + + ```bash + cd telegram-bridge + ``` + + Set up your API credentials as environment variables either by exporting them: + + ```bash + export TELEGRAM_API_ID=your_api_id + export TELEGRAM_API_HASH=your_api_hash + ``` + + Or by creating a `.env` file based on the provided `.env.example`: + + ```bash + cp .env.example .env + nano .env # or use your preferred text editor + ``` + + Then update the values in the `.env` file: + ``` + TELEGRAM_API_ID=your_api_id + TELEGRAM_API_HASH=your_api_hash + ``` + + Run the Python application: + + ```bash + python main.py + ``` + + The first time you run it, you will be prompted to enter your phone number and the verification code sent to your Telegram account. + +4. **Connect to the MCP server** + + First, update the `run_telegram_server.sh` script with your absolute repository path: + + ```bash + # Open the script in your preferred editor + nano run_telegram_server.sh + + # Update line 4 with your absolute path to the repository + # Change this: + BASE_DIR="/Users/muhammadabdullah/Desktop/mcp/telegram-mcp" + # To your actual BASE_DIR path (get it by running `pwd` in the telegram-mcp directory) + ``` + + Then, configure the MCP server by creating a JSON configuration file with the following format: + + ```json + { + "mcpServers": { + "telegram": { + "command": "/bin/bash", + "args": [ + "{{BASE_DIR}}/run_telegram_server.sh" // BASE_DIR is the same as above + ] + } + } + } + ``` + + For **Claude**, save this as `claude_desktop_config.json` in your Claude Desktop configuration directory at: + + ``` + ~/Library/Application\ Support/Claude/claude_desktop_config.json + ``` + + For **Cursor**, save this as `mcp.json` in your Cursor configuration directory at: + + ``` + ~/.cursor/mcp.json + ``` + +5. **Restart Claude Desktop / Cursor** + + Open Claude Desktop and you should now see Telegram as an available integration. + + Or restart Cursor. + +## Architecture Overview + +This application consists of two main components: + +1. **Python Telegram Bridge** (`telegram-bridge/`): A Python application that connects to Telegram's API, handles authentication, and stores message history in SQLite. It serves as the bridge between Telegram and the MCP server. Can real-time sync for latest messages. + +2. **Python MCP Server** (`telegram-mcp-server/`): A Python server implementing the Model Context Protocol (MCP), which provides standardized tools for Claude to interact with Telegram data and send/receive messages. + +### Data Storage + +- All message history is stored in a SQLite database within the `telegram-bridge/store/` directory +- The database maintains tables for chats and messages +- Messages are indexed for efficient searching and retrieval + +## Usage + +Once connected, you can interact with your Telegram contacts through Claude, leveraging Claude's AI capabilities in your Telegram conversations. + +### MCP Tools + +Claude can access the following tools to interact with Telegram: + +- **search_contacts**: Search for contacts by name or username +- **list_messages**: Retrieve messages with optional filters and context +- **list_chats**: List available chats with metadata +- **get_chat**: Get information about a specific chat +- **get_direct_chat_by_contact**: Find a direct chat with a specific contact +- **get_contact_chats**: List all chats involving a specific contact +- **get_last_interaction**: Get the most recent message with a contact +- **get_message_context**: Retrieve context around a specific message +- **send_message**: Send a Telegram message to a specified username or chat ID + +## Technical Details + +1. Claude sends requests to the Python MCP server +2. The MCP server queries the Telegram bridge or directly the SQLite database +3. The bridge accesses the Telegram API and keeps the SQLite database up to date +4. Data flows back through the chain to Claude +5. When sending messages, the request flows from Claude through the MCP server to the Telegram bridge and to Telegram + +## Troubleshooting + +- Make sure both the Telegram bridge and the Python server are running for the integration to work properly. + +### Authentication Issues + +- **Session expired**: If your session expires, you might need to re-authenticate. Delete the `telegram-bridge/store/telegram_session.session` file and restart the bridge. +- **Two-factor authentication**: If you have 2FA enabled on your Telegram account, you'll be prompted for your password during authentication. +- **No Messages Loading**: After initial authentication, it can take several minutes for your message history to load, especially if you have many chats. + +For additional Claude Desktop integration troubleshooting, see the [MCP documentation](https://modelcontextprotocol.io/quickstart/server#claude-for-desktop-integration-issues). \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..040260a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +# Telegram Bridge dependencies +telethon +cryptg +python-dotenv +requests + +# MCP Server dependencies +fastmcp +python-dateutil \ No newline at end of file diff --git a/run_telegram_server.sh b/run_telegram_server.sh new file mode 100755 index 0000000..951cfd9 --- /dev/null +++ b/run_telegram_server.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +# Define the base directory and environment path +BASE_DIR="/Users/muhammadabdullah/Desktop/mcp/telegram-mcp" +ENV_PATH="$BASE_DIR/myenv" + +# Check if the virtual environment exists +if [ ! -d "$ENV_PATH" ]; then + python3 -m venv "$ENV_PATH" + source "$ENV_PATH/bin/activate" + pip install -r "$BASE_DIR/requirements.txt" > /dev/null 2>&1 +else + source "$ENV_PATH/bin/activate" +fi + +# Run the MCP server +python "$BASE_DIR/telegram-mcp-server/main.py" \ No newline at end of file diff --git a/telegram-bridge/.env.example b/telegram-bridge/.env.example new file mode 100644 index 0000000..0106164 --- /dev/null +++ b/telegram-bridge/.env.example @@ -0,0 +1,4 @@ +# Telegram API credentials +# Get these from https://my.telegram.org/auth +TELEGRAM_API_ID=your_app_id +TELEGRAM_API_HASH=your_hash \ No newline at end of file diff --git a/telegram-bridge/main.py b/telegram-bridge/main.py new file mode 100644 index 0000000..69bb236 --- /dev/null +++ b/telegram-bridge/main.py @@ -0,0 +1,610 @@ +""" +Telegram Bridge Module + +This module connects to the Telegram API using the Telethon library, enabling +interaction with Telegram chats and messages. It provides an HTTP server for +sending messages to Telegram users or groups, and stores message history in +a SQLite database. + +Key Features: +- Connects to Telegram using API credentials. +- Stores chat and message data in a database. +- Provides an HTTP API for sending messages. +- Synchronizes message history and processes new messages. + +Usage: +- Ensure TELEGRAM_API_ID and TELEGRAM_API_HASH are set in environment variables. +- Run the script to start the Telegram bridge and HTTP server. +- Use the '/api/send' endpoint to send messages via HTTP requests. +""" + +import os +import sqlite3 +import json +import asyncio +import logging +import sys +from datetime import datetime +from typing import Dict, Any, Optional, List, Tuple +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading +from dotenv import load_dotenv + +from telethon import TelegramClient, events +from telethon.tl.types import ( + User, + Chat, + Channel, + Message, + Dialog, +) +from telethon.utils import get_display_name + +# Global variable to store the main event loop +main_loop = None + +# Load environment variables from .env file if it exists +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger(__name__) + +# Directory for storing data +STORE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "store") +os.makedirs(STORE_DIR, exist_ok=True) + +# Database path +DB_PATH = os.path.join(STORE_DIR, "messages.db") + +# API credentials from environment variables +API_ID = os.getenv("TELEGRAM_API_ID") +API_HASH = os.getenv("TELEGRAM_API_HASH") + +if not API_ID or not API_HASH: + logger.error( + "TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set" + ) + logger.error("Get them from https://my.telegram.org/auth") + sys.exit(1) + +# Initialize the Telegram client +SESSION_FILE = os.path.join(STORE_DIR, "telegram_session") +client = TelegramClient(SESSION_FILE, API_ID, API_HASH) + + +class MessageStore: + """Handles storage and retrieval of Telegram messages in SQLite.""" + + def __init__(self, db_path: str): + """Initialize the message store with the given database path.""" + self.db_path = db_path + self.init_db() + + def init_db(self): + """Initialize the database with necessary tables.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # Create tables if they don't exist + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS chats ( + id INTEGER PRIMARY KEY, + title TEXT, + username TEXT, + type TEXT, + last_message_time TIMESTAMP + ) + """ + ) + + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER, + chat_id INTEGER, + sender_id INTEGER, + sender_name TEXT, + content TEXT, + timestamp TIMESTAMP, + is_from_me BOOLEAN, + PRIMARY KEY (id, chat_id), + FOREIGN KEY (chat_id) REFERENCES chats(id) + ) + """ + ) + + # Create indexes for efficient queries + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_messages_content ON messages(content)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_messages_sender_id ON messages(sender_id)" + ) + + conn.commit() + conn.close() + + def store_chat( + self, + chat_id: int, + title: str, + username: Optional[str], + chat_type: str, + last_message_time: datetime, + ) -> None: + """Store a chat in the database.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute( + "INSERT OR REPLACE INTO chats (id, title, username, type, last_message_time) VALUES (?, ?, ?, ?, ?)", + (chat_id, title, username, chat_type, last_message_time.isoformat()), + ) + + conn.commit() + conn.close() + + def store_message( + self, + message_id: int, + chat_id: int, + sender_id: int, + sender_name: str, + content: str, + timestamp: datetime, + is_from_me: bool, + ) -> None: + """Store a message in the database.""" + if not content: # Skip empty messages + return + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute( + """INSERT OR REPLACE INTO messages + (id, chat_id, sender_id, sender_name, content, timestamp, is_from_me) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + message_id, + chat_id, + sender_id, + sender_name, + content, + timestamp.isoformat(), + is_from_me, + ), + ) + + conn.commit() + conn.close() + + def get_messages( + self, + chat_id: Optional[int] = None, + limit: int = 50, + query: Optional[str] = None, + offset: int = 0, + ) -> List[Dict[str, Any]]: + """Get messages from the database.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + query_parts = [ + "SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m" + ] + query_parts.append("JOIN chats c ON m.chat_id = c.id") + + conditions = [] + params = [] + + if chat_id: + conditions.append("m.chat_id = ?") + params.append(chat_id) + + if query: + conditions.append("m.content LIKE ?") + params.append(f"%{query}%") + + if conditions: + query_parts.append("WHERE " + " AND ".join(conditions)) + + query_parts.append("ORDER BY m.timestamp DESC") + query_parts.append("LIMIT ? OFFSET ?") + params.extend([limit, offset]) + + cursor.execute(" ".join(query_parts), tuple(params)) + messages = cursor.fetchall() + + results = [] + for msg in messages: + timestamp = datetime.fromisoformat(msg[5]) + results.append( + { + "id": msg[0], + "chat_id": msg[1], + "chat_title": msg[2], + "sender_name": msg[3], + "content": msg[4], + "timestamp": timestamp, + "is_from_me": msg[6], + "sender_id": msg[7], + } + ) + + conn.close() + return results + + def get_chats( + self, limit: int = 50, query: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Get chats from the database.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"] + params = [] + + if query: + query_parts.append("WHERE title LIKE ? OR username LIKE ?") + params.extend([f"%{query}%", f"%{query}%"]) + + query_parts.append("ORDER BY last_message_time DESC") + query_parts.append("LIMIT ?") + params.append(limit) + + cursor.execute(" ".join(query_parts), tuple(params)) + chats = cursor.fetchall() + + results = [] + for chat in chats: + last_message_time = datetime.fromisoformat(chat[4]) if chat[4] else None + results.append( + { + "id": chat[0], + "title": chat[1], + "username": chat[2], + "type": chat[3], + "last_message_time": last_message_time, + } + ) + + conn.close() + return results + + +# Create message store +message_store = MessageStore(DB_PATH) + + +async def process_message(message: Message) -> None: + """Process and store a message.""" + if not message.text: + return # Skip non-text messages + + # Get the chat + chat = message.chat + if not chat: + return + + chat_id = message.chat_id + + # Determine chat type and name + if isinstance(chat, User): + chat_type = "user" + title = get_display_name(chat) + username = chat.username + elif isinstance(chat, Chat): + chat_type = "group" + title = chat.title + username = None + elif isinstance(chat, Channel): + chat_type = "channel" if chat.broadcast else "supergroup" + title = chat.title + username = chat.username + else: + logger.warning(f"Unknown chat type: {type(chat)}") + return + + # Store chat information + message_store.store_chat( + chat_id=chat_id, + title=title, + username=username, + chat_type=chat_type, + last_message_time=message.date, + ) + + # Get sender information + sender = await message.get_sender() + sender_id = sender.id if sender else 0 + sender_name = get_display_name(sender) if sender else "Unknown" + + # Check if the message is from the current user + my_id = (await client.get_me()).id + is_from_me = sender_id == my_id + + # Store the message + message_store.store_message( + message_id=message.id, + chat_id=chat_id, + sender_id=sender_id, + sender_name=sender_name, + content=message.text, + timestamp=message.date, + is_from_me=is_from_me, + ) + + logger.info( + f"Stored message: [{message.date}] {sender_name} in {title}: {message.text[:30]}..." + ) + + +async def sync_dialog_history(dialog: Dialog, limit: int = 100) -> None: + """Sync message history for a specific dialog.""" + chat_entity = dialog.entity + + # Extract chat info + if isinstance(chat_entity, User): + chat_type = "user" + title = get_display_name(chat_entity) + username = chat_entity.username + elif isinstance(chat_entity, Chat): + chat_type = "group" + title = chat_entity.title + username = None + elif isinstance(chat_entity, Channel): + chat_type = "channel" if chat_entity.broadcast else "supergroup" + title = chat_entity.title + username = chat_entity.username + else: + logger.warning(f"Unknown chat type: {type(chat_entity)}") + return + + # Store chat info with last message time + message_store.store_chat( + chat_id=dialog.id, + title=title, + username=username, + chat_type=chat_type, + last_message_time=dialog.date, + ) + + # Get messages + messages = await client.get_messages(dialog.entity, limit=limit) + + # Get current user ID + my_id = (await client.get_me()).id + + # Process each message + for message in messages: + if not message.text: + continue # Skip non-text messages + + # Get sender information + try: + sender = await message.get_sender() + sender_id = sender.id if sender else 0 + sender_name = get_display_name(sender) if sender else "Unknown" + except Exception as e: + logger.error(f"Error getting sender: {e}") + sender_id = 0 + sender_name = "Unknown" + + is_from_me = sender_id == my_id + + # Store the message + message_store.store_message( + message_id=message.id, + chat_id=dialog.id, + sender_id=sender_id, + sender_name=sender_name, + content=message.text, + timestamp=message.date, + is_from_me=is_from_me, + ) + + logger.info(f"Synced {len(messages)} messages from {title}") + + +async def sync_all_dialogs() -> None: + """Sync message history for all dialogs.""" + logger.info("Starting synchronization of all dialogs") + + # Get all dialogs (chats) + dialogs = await client.get_dialogs(limit=100) + + for dialog in dialogs: + try: + await sync_dialog_history(dialog) + except Exception as e: + logger.error(f"Error syncing dialog {dialog.name}: {e}") + + logger.info(f"Completed synchronization of {len(dialogs)} dialogs") + + +# HTTP server for API endpoints +class TelegramAPIHandler(BaseHTTPRequestHandler): + def do_POST(self): + content_length = int(self.headers["Content-Length"]) + post_data = self.rfile.read(content_length) + request = json.loads(post_data.decode("utf-8")) + + if self.path == "/api/send": + self._handle_send_message(request) + else: + self.send_error(404, "Endpoint not found") + + def _handle_send_message(self, request): + recipient = request.get("recipient") + message_text = request.get("message") + + if not recipient or not message_text: + self._send_json_response( + 400, {"success": False, "message": "Recipient and message are required"} + ) + return + + try: + # Instead of creating a new event loop, use a shared queue to communicate with the main thread + # Create a Future to hold the result + future = asyncio.run_coroutine_threadsafe( + send_message(recipient, message_text), main_loop + ) + + # Wait for the result (with timeout) + try: + success, message = future.result(10) # Wait up to 10 seconds + self._send_json_response( + 200 if success else 500, {"success": success, "message": message} + ) + except asyncio.TimeoutError: + self._send_json_response( + 504, + { + "success": False, + "message": "Request timed out while sending message", + }, + ) + except Exception as inner_e: + logger.error(f"Error in Future: {inner_e}") + self._send_json_response( + 500, + {"success": False, "message": f"Error in Future: {str(inner_e)}"}, + ) + + except Exception as e: + logger.error(f"Error sending message: {e}") + self._send_json_response( + 500, {"success": False, "message": f"Error: {str(e)}"} + ) + + def _send_json_response(self, status_code, data): + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data).encode("utf-8")) + + +async def send_message(recipient: str, message: str) -> Tuple[bool, str]: + """Send a message to a Telegram recipient.""" + if not client.is_connected(): + return False, "Not connected to Telegram" + + try: + # Try to parse recipient as an integer (chat ID) + try: + chat_id = int(recipient) + entity = await client.get_entity(chat_id) + except ValueError: + # Not an integer, try as username + if recipient.startswith("@"): + recipient = recipient[1:] # Remove @ if present + try: + entity = await client.get_entity(recipient) + except Exception: + # Try to find in database + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "SELECT id FROM chats WHERE title LIKE ? OR username = ?", + (f"%{recipient}%", recipient), + ) + result = cursor.fetchone() + conn.close() + + if result: + entity = await client.get_entity(result[0]) + else: + return False, f"Recipient not found: {recipient}" + + # Send the message + await client.send_message(entity, message) + return True, f"Message sent to {recipient}" + + except Exception as e: + logger.error(f"Error sending message: {e}") + return False, f"Error sending message: {str(e)}" + + +# Start HTTP server in a separate thread +def start_http_server(port: int = 8081): + server_address = ("", port) + httpd = HTTPServer(server_address, TelegramAPIHandler) + logger.info(f"Starting HTTP server on port {port}") + httpd.serve_forever() + + +async def main(): + global main_loop + + # Store the current event loop + main_loop = asyncio.get_event_loop() + + logger.info("Starting Telegram bridge") + + # Connect to Telegram + await client.connect() + + # Check if we're already authorized + if not await client.is_user_authorized(): + logger.info("Need to log in. Please enter your phone number:") + phone = input("Phone number: ") + await client.send_code_request(phone) + logger.info("Code sent. Please enter the code you received:") + code = input("Code: ") + try: + await client.sign_in(phone, code) + except Exception as e: + logger.error(f"Error signing in: {e}") + logger.info( + "If you have two-factor authentication enabled, please enter your password:" + ) + password = input("Password: ") + await client.sign_in(password=password) + + logger.info("Successfully logged in to Telegram") + + # Start HTTP server in a separate thread + server_thread = threading.Thread(target=start_http_server) + server_thread.daemon = True + server_thread.start() + + # Register event handler for new messages + @client.on(events.NewMessage) + async def handle_new_message(event): + await process_message(event.message) + + # Initial sync of message history + await sync_all_dialogs() + + # Keep the script running + logger.info("Telegram bridge is running. Press Ctrl+C to exit.") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + # Run the main function + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Shutting down Telegram bridge") + except Exception as e: + logger.error(f"Unexpected error: {e}") + sys.exit(1) diff --git a/telegram-mcp-server/main.py b/telegram-mcp-server/main.py new file mode 100644 index 0000000..e9266e7 --- /dev/null +++ b/telegram-mcp-server/main.py @@ -0,0 +1,193 @@ +from typing import List, Dict, Any, Optional, Tuple +from mcp.server.fastmcp import FastMCP +from datetime import datetime +from telegram import ( + search_contacts as telegram_search_contacts, + list_messages as telegram_list_messages, + list_chats as telegram_list_chats, + get_chat as telegram_get_chat, + get_direct_chat_by_contact as telegram_get_direct_chat_by_contact, + get_contact_chats as telegram_get_contact_chats, + get_last_interaction as telegram_get_last_interaction, + get_message_context as telegram_get_message_context, + send_message as telegram_send_message +) + +# Initialize FastMCP server +mcp = FastMCP("telegram") + +@mcp.tool() +def search_contacts(query: str) -> List[Dict[str, Any]]: + """Search Telegram contacts by name or username. + + Args: + query: Search term to match against contact names or usernames + """ + contacts = telegram_search_contacts(query) + return contacts + +@mcp.tool() +def list_messages( + date_range: Optional[Tuple[datetime, datetime]] = None, + sender_id: Optional[int] = None, + chat_id: Optional[int] = None, + query: Optional[str] = None, + limit: int = 20, + page: int = 0, + include_context: bool = True, + context_before: int = 1, + context_after: int = 1 +) -> List[Dict[str, Any]]: + """Get Telegram messages matching specified criteria with optional context. + + Args: + date_range: Optional tuple of (start_date, end_date) to filter messages by date + sender_id: Optional sender ID to filter messages by sender + chat_id: Optional chat ID to filter messages by chat + query: Optional search term to filter messages by content + limit: Maximum number of messages to return (default 20) + page: Page number for pagination (default 0) + include_context: Whether to include messages before and after matches (default True) + context_before: Number of messages to include before each match (default 1) + context_after: Number of messages to include after each match (default 1) + """ + messages = telegram_list_messages( + date_range=date_range, + sender_id=sender_id, + chat_id=chat_id, + query=query, + limit=limit, + page=page, + include_context=include_context, + context_before=context_before, + context_after=context_after + ) + return messages + +@mcp.tool() +def list_chats( + query: Optional[str] = None, + limit: int = 20, + page: int = 0, + chat_type: Optional[str] = None, + sort_by: str = "last_active" +) -> List[Dict[str, Any]]: + """Get Telegram chats matching specified criteria. + + Args: + query: Optional search term to filter chats by name or username + limit: Maximum number of chats to return (default 20) + page: Page number for pagination (default 0) + chat_type: Optional chat type filter ("user", "group", "channel", or "supergroup") + sort_by: Field to sort results by, either "last_active" or "title" (default "last_active") + """ + chats = telegram_list_chats( + query=query, + limit=limit, + page=page, + chat_type=chat_type, + sort_by=sort_by + ) + return chats + +@mcp.tool() +def get_chat(chat_id: int) -> Dict[str, Any]: + """Get Telegram chat metadata by ID. + + Args: + chat_id: The ID of the chat to retrieve + """ + chat = telegram_get_chat(chat_id) + return chat + +@mcp.tool() +def get_direct_chat_by_contact(contact_id: int) -> Dict[str, Any]: + """Get Telegram chat metadata by contact ID. + + Args: + contact_id: The contact ID to search for + """ + chat = telegram_get_direct_chat_by_contact(contact_id) + return chat + +@mcp.tool() +def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Dict[str, Any]]: + """Get all Telegram chats involving the contact. + + Args: + contact_id: The contact's ID to search for + limit: Maximum number of chats to return (default 20) + page: Page number for pagination (default 0) + """ + chats = telegram_get_contact_chats(contact_id, limit, page) + return chats + +@mcp.tool() +def get_last_interaction(contact_id: int) -> Dict[str, Any]: + """Get most recent Telegram message involving the contact. + + Args: + contact_id: The ID of the contact to search for + """ + message = telegram_get_last_interaction(contact_id) + return message + +@mcp.tool() +def get_message_context( + message_id: int, + chat_id: int, + before: int = 5, + after: int = 5 +) -> Dict[str, Any]: + """Get context around a specific Telegram message. + + Args: + message_id: The ID of the message to get context for + chat_id: The ID of the chat containing the message + before: Number of messages to include before the target message (default 5) + after: Number of messages to include after the target message (default 5) + """ + context = telegram_get_message_context(message_id, chat_id, before, after) + return context + +@mcp.tool() +def send_message( + recipient: str, + message: str +) -> Dict[str, Any]: + """Send a Telegram message to a person or group. + + Args: + recipient: The recipient - either a username (with or without @) or a chat ID + message: The message text to send + + Returns: + A dictionary containing success status and a status message + """ + # Validate input + if not recipient: + return { + "success": False, + "message": "Recipient must be provided" + } + + # Call the telegram_send_message function + success, status_message = telegram_send_message(recipient, message) + return { + "success": success, + "message": status_message + } + +if __name__ == "__main__": + # Redirect stdout/stderr to suppress initial output that might confuse Claude + import sys + import os + + # Create logs directory + os.makedirs(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'logs'), exist_ok=True) + + # Redirect stderr to log file + sys.stderr = open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'logs', 'mcp_error.log'), 'w') + + # Initialize and run the server + mcp.run(transport='stdio') \ No newline at end of file diff --git a/telegram-mcp-server/telegram.py b/telegram-mcp-server/telegram.py new file mode 100644 index 0000000..c837b0a --- /dev/null +++ b/telegram-mcp-server/telegram.py @@ -0,0 +1,564 @@ +import sqlite3 +import requests +import json +import os.path +from datetime import datetime +from dataclasses import dataclass +from typing import Optional, List, Tuple, Dict, Any + +# Database path +MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'telegram-bridge', 'store', 'messages.db') +TELEGRAM_API_BASE_URL = "http://localhost:8081/api" + +@dataclass +class Message: + id: int + chat_id: int + chat_title: str + sender_name: str + content: str + timestamp: datetime + is_from_me: bool + sender_id: int + +@dataclass +class Chat: + id: int + title: str + username: Optional[str] + type: str + last_message_time: Optional[datetime] + +@dataclass +class Contact: + id: int + username: Optional[str] + name: str + +@dataclass +class MessageContext: + message: Message + before: List[Message] + after: List[Message] + +def print_message(message: Message, show_chat_info: bool = True) -> None: + """Print a single message with consistent formatting.""" + direction = "→" if message.is_from_me else "←" + + if show_chat_info: + print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})") + else: + print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}") + + print(f"From: {'Me' if message.is_from_me else message.sender_name}") + print(f"Message: {message.content}") + print("-" * 100) + +def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None: + """Print a list of messages with a title and consistent formatting.""" + if not messages: + print("No messages to display.") + return + + if title: + print(f"\n{title}") + print("-" * 100) + + for message in messages: + print_message(message, show_chat_info) + +def print_chat(chat: Chat) -> None: + """Print a single chat with consistent formatting.""" + print(f"Chat: {chat.title} (ID: {chat.id})") + print(f"Type: {chat.type}") + if chat.username: + print(f"Username: @{chat.username}") + if chat.last_message_time: + print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}") + print("-" * 100) + +def print_chats_list(chats: List[Chat], title: str = "") -> None: + """Print a list of chats with a title and consistent formatting.""" + if not chats: + print("No chats to display.") + return + + if title: + print(f"\n{title}") + print("-" * 100) + + for chat in chats: + print_chat(chat) + +def search_contacts(query: str) -> List[Contact]: + """Search contacts by name or username.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + # Search in chats where type is 'user' + cursor.execute(""" + SELECT id, username, title + FROM chats + WHERE type = 'user' AND (title LIKE ? OR username LIKE ?) + ORDER BY title + LIMIT 50 + """, (f"%{query}%", f"%{query}%")) + + contacts = cursor.fetchall() + + result = [] + for contact_data in contacts: + contact = Contact( + id=contact_data[0], + username=contact_data[1], + name=contact_data[2] + ) + result.append(contact) + + return result + + except sqlite3.Error as e: + print(f"Database error: {e}") + return [] + finally: + if 'conn' in locals(): + conn.close() + +def list_messages( + date_range: Optional[Tuple[datetime, datetime]] = None, + sender_id: Optional[int] = None, + chat_id: Optional[int] = None, + query: Optional[str] = None, + limit: int = 20, + page: int = 0, + include_context: bool = True, + context_before: int = 1, + context_after: int = 1 +) -> List[Message]: + """Get messages matching the specified criteria with optional context.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + # Build base query + query_parts = [""" + SELECT + m.id, + m.chat_id, + c.title, + m.sender_name, + m.content, + m.timestamp, + m.is_from_me, + m.sender_id + FROM messages m + """] + query_parts.append("JOIN chats c ON m.chat_id = c.id") + where_clauses = [] + params = [] + + # Add filters + if date_range: + where_clauses.append("m.timestamp BETWEEN ? AND ?") + params.extend([date_range[0].isoformat(), date_range[1].isoformat()]) + + if sender_id: + where_clauses.append("m.sender_id = ?") + params.append(sender_id) + + if chat_id: + where_clauses.append("m.chat_id = ?") + params.append(chat_id) + + if query: + where_clauses.append("LOWER(m.content) LIKE LOWER(?)") + params.append(f"%{query}%") + + if where_clauses: + query_parts.append("WHERE " + " AND ".join(where_clauses)) + + # Add pagination + offset = page * limit + query_parts.append("ORDER BY m.timestamp DESC") + query_parts.append("LIMIT ? OFFSET ?") + params.extend([limit, offset]) + + cursor.execute(" ".join(query_parts), tuple(params)) + messages = cursor.fetchall() + + result = [] + for msg in messages: + message = Message( + id=msg[0], + chat_id=msg[1], + chat_title=msg[2], + sender_name=msg[3], + content=msg[4], + timestamp=datetime.fromisoformat(msg[5]), + is_from_me=bool(msg[6]), + sender_id=msg[7] + ) + result.append(message) + + if include_context and result: + # Add context for each message + messages_with_context = [] + for msg in result: + context = get_message_context(msg.id, msg.chat_id, context_before, context_after) + messages_with_context.extend(context.before) + messages_with_context.append(context.message) + messages_with_context.extend(context.after) + return messages_with_context + + return result + + except sqlite3.Error as e: + print(f"Database error: {e}") + return [] + finally: + if 'conn' in locals(): + conn.close() + +def get_message_context( + message_id: int, + chat_id: int, + before: int = 5, + after: int = 5 +) -> MessageContext: + """Get context around a specific message.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + # Get the target message first + cursor.execute(""" + SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id + FROM messages m + JOIN chats c ON m.chat_id = c.id + WHERE m.id = ? AND m.chat_id = ? + """, (message_id, chat_id)) + msg_data = cursor.fetchone() + + if not msg_data: + raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found") + + target_message = Message( + id=msg_data[0], + chat_id=msg_data[1], + chat_title=msg_data[2], + sender_name=msg_data[3], + content=msg_data[4], + timestamp=datetime.fromisoformat(msg_data[5]), + is_from_me=bool(msg_data[6]), + sender_id=msg_data[7] + ) + + # Get messages before + cursor.execute(""" + SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id + FROM messages m + JOIN chats c ON m.chat_id = c.id + WHERE m.chat_id = ? AND m.timestamp < ? + ORDER BY m.timestamp DESC + LIMIT ? + """, (chat_id, target_message.timestamp.isoformat(), before)) + + before_messages = [] + for msg in cursor.fetchall(): + before_messages.append(Message( + id=msg[0], + chat_id=msg[1], + chat_title=msg[2], + sender_name=msg[3], + content=msg[4], + timestamp=datetime.fromisoformat(msg[5]), + is_from_me=bool(msg[6]), + sender_id=msg[7] + )) + + # Get messages after + cursor.execute(""" + SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id + FROM messages m + JOIN chats c ON m.chat_id = c.id + WHERE m.chat_id = ? AND m.timestamp > ? + ORDER BY m.timestamp ASC + LIMIT ? + """, (chat_id, target_message.timestamp.isoformat(), after)) + + after_messages = [] + for msg in cursor.fetchall(): + after_messages.append(Message( + id=msg[0], + chat_id=msg[1], + chat_title=msg[2], + sender_name=msg[3], + content=msg[4], + timestamp=datetime.fromisoformat(msg[5]), + is_from_me=bool(msg[6]), + sender_id=msg[7] + )) + + return MessageContext( + message=target_message, + before=before_messages, + after=after_messages + ) + + except sqlite3.Error as e: + print(f"Database error: {e}") + raise + finally: + if 'conn' in locals(): + conn.close() + +def list_chats( + query: Optional[str] = None, + limit: int = 20, + page: int = 0, + chat_type: Optional[str] = None, + sort_by: str = "last_active" +) -> List[Chat]: + """Get chats matching the specified criteria.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + # Build base query + query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"] + + where_clauses = [] + params = [] + + if query: + where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(username) LIKE LOWER(?))") + params.extend([f"%{query}%", f"%{query}%"]) + + if chat_type: + where_clauses.append("type = ?") + params.append(chat_type) + + if where_clauses: + query_parts.append("WHERE " + " AND ".join(where_clauses)) + + # Add sorting + order_by = "last_message_time DESC" if sort_by == "last_active" else "title" + query_parts.append(f"ORDER BY {order_by}") + + # Add pagination + offset = (page) * limit + query_parts.append("LIMIT ? OFFSET ?") + params.extend([limit, offset]) + + cursor.execute(" ".join(query_parts), tuple(params)) + chats = cursor.fetchall() + + result = [] + for chat_data in chats: + last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + chat = Chat( + id=chat_data[0], + title=chat_data[1], + username=chat_data[2], + type=chat_data[3], + last_message_time=last_message_time + ) + result.append(chat) + + return result + + except sqlite3.Error as e: + print(f"Database error: {e}") + return [] + finally: + if 'conn' in locals(): + conn.close() + +def get_chat(chat_id: int) -> Optional[Chat]: + """Get chat metadata by ID.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT id, title, username, type, last_message_time + FROM chats + WHERE id = ? + """, (chat_id,)) + + chat_data = cursor.fetchone() + + if not chat_data: + return None + + last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + return Chat( + id=chat_data[0], + title=chat_data[1], + username=chat_data[2], + type=chat_data[3], + last_message_time=last_message_time + ) + + except sqlite3.Error as e: + print(f"Database error: {e}") + return None + finally: + if 'conn' in locals(): + conn.close() + +def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]: + """Get direct chat metadata by contact ID.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT id, title, username, type, last_message_time + FROM chats + WHERE id = ? AND type = 'user' + """, (contact_id,)) + + chat_data = cursor.fetchone() + + if not chat_data: + return None + + last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + return Chat( + id=chat_data[0], + title=chat_data[1], + username=chat_data[2], + type=chat_data[3], + last_message_time=last_message_time + ) + + except sqlite3.Error as e: + print(f"Database error: {e}") + return None + finally: + if 'conn' in locals(): + conn.close() + +def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]: + """Get all chats involving the contact. + + Args: + contact_id: The contact's ID to search for + limit: Maximum number of chats to return (default 20) + page: Page number for pagination (default 0) + """ + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT DISTINCT + c.id, c.title, c.username, c.type, c.last_message_time + FROM chats c + JOIN messages m ON c.id = m.chat_id + WHERE m.sender_id = ? OR c.id = ? + ORDER BY c.last_message_time DESC + LIMIT ? OFFSET ? + """, (contact_id, contact_id, limit, page * limit)) + + chats = cursor.fetchall() + + result = [] + for chat_data in chats: + last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None + chat = Chat( + id=chat_data[0], + title=chat_data[1], + username=chat_data[2], + type=chat_data[3], + last_message_time=last_message_time + ) + result.append(chat) + + return result + + except sqlite3.Error as e: + print(f"Database error: {e}") + return [] + finally: + if 'conn' in locals(): + conn.close() + +def get_last_interaction(contact_id: int) -> Optional[Message]: + """Get most recent message involving the contact.""" + try: + conn = sqlite3.connect(MESSAGES_DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT + m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id + FROM messages m + JOIN chats c ON m.chat_id = c.id + WHERE m.sender_id = ? OR c.id = ? + ORDER BY m.timestamp DESC + LIMIT 1 + """, (contact_id, contact_id)) + + msg_data = cursor.fetchone() + + if not msg_data: + return None + + return Message( + id=msg_data[0], + chat_id=msg_data[1], + chat_title=msg_data[2], + sender_name=msg_data[3], + content=msg_data[4], + timestamp=datetime.fromisoformat(msg_data[5]), + is_from_me=bool(msg_data[6]), + sender_id=msg_data[7] + ) + + except sqlite3.Error as e: + print(f"Database error: {e}") + return None + finally: + if 'conn' in locals(): + conn.close() + +def send_message(recipient: str, message: str) -> Tuple[bool, str]: + """Send a Telegram message to the specified recipient. + + Args: + recipient: The recipient - either a username (with or without @), + or a chat ID as a string or integer + message: The message text to send + + Returns: + Tuple[bool, str]: A tuple containing success status and a status message + """ + try: + # Validate input + if not recipient: + return False, "Recipient must be provided" + + url = f"{TELEGRAM_API_BASE_URL}/send" + payload = { + "recipient": recipient, + "message": message + } + + response = requests.post(url, json=payload) + + # Check if the request was successful + if response.status_code == 200: + result = response.json() + return result.get("success", False), result.get("message", "Unknown response") + else: + return False, f"Error: HTTP {response.status_code} - {response.text}" + + except requests.RequestException as e: + return False, f"Request error: {str(e)}" + except json.JSONDecodeError: + return False, f"Error parsing response: Unknown" + except Exception as e: + return False, f"Unexpected error: {str(e)}" \ No newline at end of file