Initial commit: Telegram MCP server and bridge

This commit is contained in:
Muhammad18557 2025-04-04 15:42:47 +08:00
commit f6a7d983e5
8 changed files with 1632 additions and 0 deletions

58
.gitignore vendored Normal file
View file

@ -0,0 +1,58 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual environments
myenv/
venv/
ENV/
env/
.env
# Telegram session files
telegram-bridge/store/*.session
telegram-bridge/store/*.session-journal
# Database files with messages
telegram-bridge/store/*.db
telegram-bridge/store/*.db-shm
telegram-bridge/store/*.db-wal
# Logs
logs/
*.log
# IDE files
.idea/
.vscode/
*.swp
*.swo
.DS_Store
# Sensitive information
telegram-bridge/.env
*.key
*.pem
*.crt
# User-specific files
claude_desktop_config.json
mcp.json

177
README.md Normal file
View file

@ -0,0 +1,177 @@
# Telegram MCP Server
This is a Model Context Protocol (MCP) server for Telegram.
With this you can search your personal Telegram messages, search your contacts, and send messages to either individuals or groups.
It connects to your **personal Telegram account** directly via the Telegram API (using the [Telethon](https://github.com/LonamiWebs/Telethon) library). All your messages are stored locally in a SQLite database and only sent to an LLM (such as Claude) when the agent accesses them through tools (which you control).
## Installation
### Prerequisites
- Python 3.6+
- Anthropic Claude Desktop app (or Cursor)
### Steps
1. **Clone this repository**
```bash
git clone https://github.com/yourusername/telegram-mcp.git
cd telegram-mcp
```
Create a Python virtual environment and activate it:
```bash
python3 -m venv myenv
source myenv/bin/activate
```
Install dependencies:
```bash
pip install -r requirements.txt
```
This is the environment that can be used both for running the **Telegram bridge** and the **MCP server**.
2. **Get your Telegram API credentials**
- Go to https://my.telegram.org/auth
- Log in and go to "API development tools"
- Create a new application
- Note your API ID and API hash
3. **Run the Telegram bridge**
Navigate to the telegram-bridge directory:
```bash
cd telegram-bridge
```
Set up your API credentials as environment variables either by exporting them:
```bash
export TELEGRAM_API_ID=your_api_id
export TELEGRAM_API_HASH=your_api_hash
```
Or by creating a `.env` file based on the provided `.env.example`:
```bash
cp .env.example .env
nano .env # or use your preferred text editor
```
Then update the values in the `.env` file:
```
TELEGRAM_API_ID=your_api_id
TELEGRAM_API_HASH=your_api_hash
```
Run the Python application:
```bash
python main.py
```
The first time you run it, you will be prompted to enter your phone number and the verification code sent to your Telegram account.
4. **Connect to the MCP server**
First, update the `run_telegram_server.sh` script with your absolute repository path:
```bash
# Open the script in your preferred editor
nano run_telegram_server.sh
# Update line 4 with your absolute path to the repository
# Change this:
BASE_DIR="/Users/muhammadabdullah/Desktop/mcp/telegram-mcp"
# To your actual BASE_DIR path (get it by running `pwd` in the telegram-mcp directory)
```
Then, configure the MCP server by creating a JSON configuration file with the following format:
```json
{
"mcpServers": {
"telegram": {
"command": "/bin/bash",
"args": [
"{{BASE_DIR}}/run_telegram_server.sh" // BASE_DIR is the same as above
]
}
}
}
```
For **Claude**, save this as `claude_desktop_config.json` in your Claude Desktop configuration directory at:
```
~/Library/Application\ Support/Claude/claude_desktop_config.json
```
For **Cursor**, save this as `mcp.json` in your Cursor configuration directory at:
```
~/.cursor/mcp.json
```
5. **Restart Claude Desktop / Cursor**
Open Claude Desktop and you should now see Telegram as an available integration.
Or restart Cursor.
## Architecture Overview
This application consists of two main components:
1. **Python Telegram Bridge** (`telegram-bridge/`): A Python application that connects to Telegram's API, handles authentication, and stores message history in SQLite. It serves as the bridge between Telegram and the MCP server. Can real-time sync for latest messages.
2. **Python MCP Server** (`telegram-mcp-server/`): A Python server implementing the Model Context Protocol (MCP), which provides standardized tools for Claude to interact with Telegram data and send/receive messages.
### Data Storage
- All message history is stored in a SQLite database within the `telegram-bridge/store/` directory
- The database maintains tables for chats and messages
- Messages are indexed for efficient searching and retrieval
## Usage
Once connected, you can interact with your Telegram contacts through Claude, leveraging Claude's AI capabilities in your Telegram conversations.
### MCP Tools
Claude can access the following tools to interact with Telegram:
- **search_contacts**: Search for contacts by name or username
- **list_messages**: Retrieve messages with optional filters and context
- **list_chats**: List available chats with metadata
- **get_chat**: Get information about a specific chat
- **get_direct_chat_by_contact**: Find a direct chat with a specific contact
- **get_contact_chats**: List all chats involving a specific contact
- **get_last_interaction**: Get the most recent message with a contact
- **get_message_context**: Retrieve context around a specific message
- **send_message**: Send a Telegram message to a specified username or chat ID
## Technical Details
1. Claude sends requests to the Python MCP server
2. The MCP server queries the Telegram bridge or directly the SQLite database
3. The bridge accesses the Telegram API and keeps the SQLite database up to date
4. Data flows back through the chain to Claude
5. When sending messages, the request flows from Claude through the MCP server to the Telegram bridge and to Telegram
## Troubleshooting
- Make sure both the Telegram bridge and the Python server are running for the integration to work properly.
### Authentication Issues
- **Session expired**: If your session expires, you might need to re-authenticate. Delete the `telegram-bridge/store/telegram_session.session` file and restart the bridge.
- **Two-factor authentication**: If you have 2FA enabled on your Telegram account, you'll be prompted for your password during authentication.
- **No Messages Loading**: After initial authentication, it can take several minutes for your message history to load, especially if you have many chats.
For additional Claude Desktop integration troubleshooting, see the [MCP documentation](https://modelcontextprotocol.io/quickstart/server#claude-for-desktop-integration-issues).

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
# Telegram Bridge dependencies
telethon
cryptg
python-dotenv
requests
# MCP Server dependencies
fastmcp
python-dateutil

17
run_telegram_server.sh Executable file
View file

@ -0,0 +1,17 @@
#!/bin/bash
# Define the base directory and environment path
BASE_DIR="/Users/muhammadabdullah/Desktop/mcp/telegram-mcp"
ENV_PATH="$BASE_DIR/myenv"
# Check if the virtual environment exists
if [ ! -d "$ENV_PATH" ]; then
python3 -m venv "$ENV_PATH"
source "$ENV_PATH/bin/activate"
pip install -r "$BASE_DIR/requirements.txt" > /dev/null 2>&1
else
source "$ENV_PATH/bin/activate"
fi
# Run the MCP server
python "$BASE_DIR/telegram-mcp-server/main.py"

View file

@ -0,0 +1,4 @@
# Telegram API credentials
# Get these from https://my.telegram.org/auth
TELEGRAM_API_ID=your_app_id
TELEGRAM_API_HASH=your_hash

610
telegram-bridge/main.py Normal file
View file

@ -0,0 +1,610 @@
"""
Telegram Bridge Module
This module connects to the Telegram API using the Telethon library, enabling
interaction with Telegram chats and messages. It provides an HTTP server for
sending messages to Telegram users or groups, and stores message history in
a SQLite database.
Key Features:
- Connects to Telegram using API credentials.
- Stores chat and message data in a database.
- Provides an HTTP API for sending messages.
- Synchronizes message history and processes new messages.
Usage:
- Ensure TELEGRAM_API_ID and TELEGRAM_API_HASH are set in environment variables.
- Run the script to start the Telegram bridge and HTTP server.
- Use the '/api/send' endpoint to send messages via HTTP requests.
"""
import os
import sqlite3
import json
import asyncio
import logging
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
from dotenv import load_dotenv
from telethon import TelegramClient, events
from telethon.tl.types import (
User,
Chat,
Channel,
Message,
Dialog,
)
from telethon.utils import get_display_name
# Global variable to store the main event loop
main_loop = None
# Load environment variables from .env file if it exists
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
# Directory for storing data
STORE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "store")
os.makedirs(STORE_DIR, exist_ok=True)
# Database path
DB_PATH = os.path.join(STORE_DIR, "messages.db")
# API credentials from environment variables
API_ID = os.getenv("TELEGRAM_API_ID")
API_HASH = os.getenv("TELEGRAM_API_HASH")
if not API_ID or not API_HASH:
logger.error(
"TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set"
)
logger.error("Get them from https://my.telegram.org/auth")
sys.exit(1)
# Initialize the Telegram client
SESSION_FILE = os.path.join(STORE_DIR, "telegram_session")
client = TelegramClient(SESSION_FILE, API_ID, API_HASH)
class MessageStore:
"""Handles storage and retrieval of Telegram messages in SQLite."""
def __init__(self, db_path: str):
"""Initialize the message store with the given database path."""
self.db_path = db_path
self.init_db()
def init_db(self):
"""Initialize the database with necessary tables."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables if they don't exist
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS chats (
id INTEGER PRIMARY KEY,
title TEXT,
username TEXT,
type TEXT,
last_message_time TIMESTAMP
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER,
chat_id INTEGER,
sender_id INTEGER,
sender_name TEXT,
content TEXT,
timestamp TIMESTAMP,
is_from_me BOOLEAN,
PRIMARY KEY (id, chat_id),
FOREIGN KEY (chat_id) REFERENCES chats(id)
)
"""
)
# Create indexes for efficient queries
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_content ON messages(content)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_sender_id ON messages(sender_id)"
)
conn.commit()
conn.close()
def store_chat(
self,
chat_id: int,
title: str,
username: Optional[str],
chat_type: str,
last_message_time: datetime,
) -> None:
"""Store a chat in the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO chats (id, title, username, type, last_message_time) VALUES (?, ?, ?, ?, ?)",
(chat_id, title, username, chat_type, last_message_time.isoformat()),
)
conn.commit()
conn.close()
def store_message(
self,
message_id: int,
chat_id: int,
sender_id: int,
sender_name: str,
content: str,
timestamp: datetime,
is_from_me: bool,
) -> None:
"""Store a message in the database."""
if not content: # Skip empty messages
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"""INSERT OR REPLACE INTO messages
(id, chat_id, sender_id, sender_name, content, timestamp, is_from_me)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
message_id,
chat_id,
sender_id,
sender_name,
content,
timestamp.isoformat(),
is_from_me,
),
)
conn.commit()
conn.close()
def get_messages(
self,
chat_id: Optional[int] = None,
limit: int = 50,
query: Optional[str] = None,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Get messages from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query_parts = [
"SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m"
]
query_parts.append("JOIN chats c ON m.chat_id = c.id")
conditions = []
params = []
if chat_id:
conditions.append("m.chat_id = ?")
params.append(chat_id)
if query:
conditions.append("m.content LIKE ?")
params.append(f"%{query}%")
if conditions:
query_parts.append("WHERE " + " AND ".join(conditions))
query_parts.append("ORDER BY m.timestamp DESC")
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
messages = cursor.fetchall()
results = []
for msg in messages:
timestamp = datetime.fromisoformat(msg[5])
results.append(
{
"id": msg[0],
"chat_id": msg[1],
"chat_title": msg[2],
"sender_name": msg[3],
"content": msg[4],
"timestamp": timestamp,
"is_from_me": msg[6],
"sender_id": msg[7],
}
)
conn.close()
return results
def get_chats(
self, limit: int = 50, query: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get chats from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"]
params = []
if query:
query_parts.append("WHERE title LIKE ? OR username LIKE ?")
params.extend([f"%{query}%", f"%{query}%"])
query_parts.append("ORDER BY last_message_time DESC")
query_parts.append("LIMIT ?")
params.append(limit)
cursor.execute(" ".join(query_parts), tuple(params))
chats = cursor.fetchall()
results = []
for chat in chats:
last_message_time = datetime.fromisoformat(chat[4]) if chat[4] else None
results.append(
{
"id": chat[0],
"title": chat[1],
"username": chat[2],
"type": chat[3],
"last_message_time": last_message_time,
}
)
conn.close()
return results
# Create message store
message_store = MessageStore(DB_PATH)
async def process_message(message: Message) -> None:
"""Process and store a message."""
if not message.text:
return # Skip non-text messages
# Get the chat
chat = message.chat
if not chat:
return
chat_id = message.chat_id
# Determine chat type and name
if isinstance(chat, User):
chat_type = "user"
title = get_display_name(chat)
username = chat.username
elif isinstance(chat, Chat):
chat_type = "group"
title = chat.title
username = None
elif isinstance(chat, Channel):
chat_type = "channel" if chat.broadcast else "supergroup"
title = chat.title
username = chat.username
else:
logger.warning(f"Unknown chat type: {type(chat)}")
return
# Store chat information
message_store.store_chat(
chat_id=chat_id,
title=title,
username=username,
chat_type=chat_type,
last_message_time=message.date,
)
# Get sender information
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
# Check if the message is from the current user
my_id = (await client.get_me()).id
is_from_me = sender_id == my_id
# Store the message
message_store.store_message(
message_id=message.id,
chat_id=chat_id,
sender_id=sender_id,
sender_name=sender_name,
content=message.text,
timestamp=message.date,
is_from_me=is_from_me,
)
logger.info(
f"Stored message: [{message.date}] {sender_name} in {title}: {message.text[:30]}..."
)
async def sync_dialog_history(dialog: Dialog, limit: int = 100) -> None:
"""Sync message history for a specific dialog."""
chat_entity = dialog.entity
# Extract chat info
if isinstance(chat_entity, User):
chat_type = "user"
title = get_display_name(chat_entity)
username = chat_entity.username
elif isinstance(chat_entity, Chat):
chat_type = "group"
title = chat_entity.title
username = None
elif isinstance(chat_entity, Channel):
chat_type = "channel" if chat_entity.broadcast else "supergroup"
title = chat_entity.title
username = chat_entity.username
else:
logger.warning(f"Unknown chat type: {type(chat_entity)}")
return
# Store chat info with last message time
message_store.store_chat(
chat_id=dialog.id,
title=title,
username=username,
chat_type=chat_type,
last_message_time=dialog.date,
)
# Get messages
messages = await client.get_messages(dialog.entity, limit=limit)
# Get current user ID
my_id = (await client.get_me()).id
# Process each message
for message in messages:
if not message.text:
continue # Skip non-text messages
# Get sender information
try:
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
except Exception as e:
logger.error(f"Error getting sender: {e}")
sender_id = 0
sender_name = "Unknown"
is_from_me = sender_id == my_id
# Store the message
message_store.store_message(
message_id=message.id,
chat_id=dialog.id,
sender_id=sender_id,
sender_name=sender_name,
content=message.text,
timestamp=message.date,
is_from_me=is_from_me,
)
logger.info(f"Synced {len(messages)} messages from {title}")
async def sync_all_dialogs() -> None:
"""Sync message history for all dialogs."""
logger.info("Starting synchronization of all dialogs")
# Get all dialogs (chats)
dialogs = await client.get_dialogs(limit=100)
for dialog in dialogs:
try:
await sync_dialog_history(dialog)
except Exception as e:
logger.error(f"Error syncing dialog {dialog.name}: {e}")
logger.info(f"Completed synchronization of {len(dialogs)} dialogs")
# HTTP server for API endpoints
class TelegramAPIHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
request = json.loads(post_data.decode("utf-8"))
if self.path == "/api/send":
self._handle_send_message(request)
else:
self.send_error(404, "Endpoint not found")
def _handle_send_message(self, request):
recipient = request.get("recipient")
message_text = request.get("message")
if not recipient or not message_text:
self._send_json_response(
400, {"success": False, "message": "Recipient and message are required"}
)
return
try:
# Instead of creating a new event loop, use a shared queue to communicate with the main thread
# Create a Future to hold the result
future = asyncio.run_coroutine_threadsafe(
send_message(recipient, message_text), main_loop
)
# Wait for the result (with timeout)
try:
success, message = future.result(10) # Wait up to 10 seconds
self._send_json_response(
200 if success else 500, {"success": success, "message": message}
)
except asyncio.TimeoutError:
self._send_json_response(
504,
{
"success": False,
"message": "Request timed out while sending message",
},
)
except Exception as inner_e:
logger.error(f"Error in Future: {inner_e}")
self._send_json_response(
500,
{"success": False, "message": f"Error in Future: {str(inner_e)}"},
)
except Exception as e:
logger.error(f"Error sending message: {e}")
self._send_json_response(
500, {"success": False, "message": f"Error: {str(e)}"}
)
def _send_json_response(self, status_code, data):
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
async def send_message(recipient: str, message: str) -> Tuple[bool, str]:
"""Send a message to a Telegram recipient."""
if not client.is_connected():
return False, "Not connected to Telegram"
try:
# Try to parse recipient as an integer (chat ID)
try:
chat_id = int(recipient)
entity = await client.get_entity(chat_id)
except ValueError:
# Not an integer, try as username
if recipient.startswith("@"):
recipient = recipient[1:] # Remove @ if present
try:
entity = await client.get_entity(recipient)
except Exception:
# Try to find in database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"SELECT id FROM chats WHERE title LIKE ? OR username = ?",
(f"%{recipient}%", recipient),
)
result = cursor.fetchone()
conn.close()
if result:
entity = await client.get_entity(result[0])
else:
return False, f"Recipient not found: {recipient}"
# Send the message
await client.send_message(entity, message)
return True, f"Message sent to {recipient}"
except Exception as e:
logger.error(f"Error sending message: {e}")
return False, f"Error sending message: {str(e)}"
# Start HTTP server in a separate thread
def start_http_server(port: int = 8081):
server_address = ("", port)
httpd = HTTPServer(server_address, TelegramAPIHandler)
logger.info(f"Starting HTTP server on port {port}")
httpd.serve_forever()
async def main():
global main_loop
# Store the current event loop
main_loop = asyncio.get_event_loop()
logger.info("Starting Telegram bridge")
# Connect to Telegram
await client.connect()
# Check if we're already authorized
if not await client.is_user_authorized():
logger.info("Need to log in. Please enter your phone number:")
phone = input("Phone number: ")
await client.send_code_request(phone)
logger.info("Code sent. Please enter the code you received:")
code = input("Code: ")
try:
await client.sign_in(phone, code)
except Exception as e:
logger.error(f"Error signing in: {e}")
logger.info(
"If you have two-factor authentication enabled, please enter your password:"
)
password = input("Password: ")
await client.sign_in(password=password)
logger.info("Successfully logged in to Telegram")
# Start HTTP server in a separate thread
server_thread = threading.Thread(target=start_http_server)
server_thread.daemon = True
server_thread.start()
# Register event handler for new messages
@client.on(events.NewMessage)
async def handle_new_message(event):
await process_message(event.message)
# Initial sync of message history
await sync_all_dialogs()
# Keep the script running
logger.info("Telegram bridge is running. Press Ctrl+C to exit.")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
# Run the main function
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutting down Telegram bridge")
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)

193
telegram-mcp-server/main.py Normal file
View file

@ -0,0 +1,193 @@
from typing import List, Dict, Any, Optional, Tuple
from mcp.server.fastmcp import FastMCP
from datetime import datetime
from telegram import (
search_contacts as telegram_search_contacts,
list_messages as telegram_list_messages,
list_chats as telegram_list_chats,
get_chat as telegram_get_chat,
get_direct_chat_by_contact as telegram_get_direct_chat_by_contact,
get_contact_chats as telegram_get_contact_chats,
get_last_interaction as telegram_get_last_interaction,
get_message_context as telegram_get_message_context,
send_message as telegram_send_message
)
# Initialize FastMCP server
mcp = FastMCP("telegram")
@mcp.tool()
def search_contacts(query: str) -> List[Dict[str, Any]]:
"""Search Telegram contacts by name or username.
Args:
query: Search term to match against contact names or usernames
"""
contacts = telegram_search_contacts(query)
return contacts
@mcp.tool()
def list_messages(
date_range: Optional[Tuple[datetime, datetime]] = None,
sender_id: Optional[int] = None,
chat_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_context: bool = True,
context_before: int = 1,
context_after: int = 1
) -> List[Dict[str, Any]]:
"""Get Telegram messages matching specified criteria with optional context.
Args:
date_range: Optional tuple of (start_date, end_date) to filter messages by date
sender_id: Optional sender ID to filter messages by sender
chat_id: Optional chat ID to filter messages by chat
query: Optional search term to filter messages by content
limit: Maximum number of messages to return (default 20)
page: Page number for pagination (default 0)
include_context: Whether to include messages before and after matches (default True)
context_before: Number of messages to include before each match (default 1)
context_after: Number of messages to include after each match (default 1)
"""
messages = telegram_list_messages(
date_range=date_range,
sender_id=sender_id,
chat_id=chat_id,
query=query,
limit=limit,
page=page,
include_context=include_context,
context_before=context_before,
context_after=context_after
)
return messages
@mcp.tool()
def list_chats(
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_active"
) -> List[Dict[str, Any]]:
"""Get Telegram chats matching specified criteria.
Args:
query: Optional search term to filter chats by name or username
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
chat_type: Optional chat type filter ("user", "group", "channel", or "supergroup")
sort_by: Field to sort results by, either "last_active" or "title" (default "last_active")
"""
chats = telegram_list_chats(
query=query,
limit=limit,
page=page,
chat_type=chat_type,
sort_by=sort_by
)
return chats
@mcp.tool()
def get_chat(chat_id: int) -> Dict[str, Any]:
"""Get Telegram chat metadata by ID.
Args:
chat_id: The ID of the chat to retrieve
"""
chat = telegram_get_chat(chat_id)
return chat
@mcp.tool()
def get_direct_chat_by_contact(contact_id: int) -> Dict[str, Any]:
"""Get Telegram chat metadata by contact ID.
Args:
contact_id: The contact ID to search for
"""
chat = telegram_get_direct_chat_by_contact(contact_id)
return chat
@mcp.tool()
def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Dict[str, Any]]:
"""Get all Telegram chats involving the contact.
Args:
contact_id: The contact's ID to search for
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
"""
chats = telegram_get_contact_chats(contact_id, limit, page)
return chats
@mcp.tool()
def get_last_interaction(contact_id: int) -> Dict[str, Any]:
"""Get most recent Telegram message involving the contact.
Args:
contact_id: The ID of the contact to search for
"""
message = telegram_get_last_interaction(contact_id)
return message
@mcp.tool()
def get_message_context(
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> Dict[str, Any]:
"""Get context around a specific Telegram message.
Args:
message_id: The ID of the message to get context for
chat_id: The ID of the chat containing the message
before: Number of messages to include before the target message (default 5)
after: Number of messages to include after the target message (default 5)
"""
context = telegram_get_message_context(message_id, chat_id, before, after)
return context
@mcp.tool()
def send_message(
recipient: str,
message: str
) -> Dict[str, Any]:
"""Send a Telegram message to a person or group.
Args:
recipient: The recipient - either a username (with or without @) or a chat ID
message: The message text to send
Returns:
A dictionary containing success status and a status message
"""
# Validate input
if not recipient:
return {
"success": False,
"message": "Recipient must be provided"
}
# Call the telegram_send_message function
success, status_message = telegram_send_message(recipient, message)
return {
"success": success,
"message": status_message
}
if __name__ == "__main__":
# Redirect stdout/stderr to suppress initial output that might confuse Claude
import sys
import os
# Create logs directory
os.makedirs(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'logs'), exist_ok=True)
# Redirect stderr to log file
sys.stderr = open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'logs', 'mcp_error.log'), 'w')
# Initialize and run the server
mcp.run(transport='stdio')

View file

@ -0,0 +1,564 @@
import sqlite3
import requests
import json
import os.path
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict, Any
# Database path
MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'telegram-bridge', 'store', 'messages.db')
TELEGRAM_API_BASE_URL = "http://localhost:8081/api"
@dataclass
class Message:
id: int
chat_id: int
chat_title: str
sender_name: str
content: str
timestamp: datetime
is_from_me: bool
sender_id: int
@dataclass
class Chat:
id: int
title: str
username: Optional[str]
type: str
last_message_time: Optional[datetime]
@dataclass
class Contact:
id: int
username: Optional[str]
name: str
@dataclass
class MessageContext:
message: Message
before: List[Message]
after: List[Message]
def print_message(message: Message, show_chat_info: bool = True) -> None:
"""Print a single message with consistent formatting."""
direction = "" if message.is_from_me else ""
if show_chat_info:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})")
else:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}")
print(f"From: {'Me' if message.is_from_me else message.sender_name}")
print(f"Message: {message.content}")
print("-" * 100)
def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None:
"""Print a list of messages with a title and consistent formatting."""
if not messages:
print("No messages to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for message in messages:
print_message(message, show_chat_info)
def print_chat(chat: Chat) -> None:
"""Print a single chat with consistent formatting."""
print(f"Chat: {chat.title} (ID: {chat.id})")
print(f"Type: {chat.type}")
if chat.username:
print(f"Username: @{chat.username}")
if chat.last_message_time:
print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}")
print("-" * 100)
def print_chats_list(chats: List[Chat], title: str = "") -> None:
"""Print a list of chats with a title and consistent formatting."""
if not chats:
print("No chats to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for chat in chats:
print_chat(chat)
def search_contacts(query: str) -> List[Contact]:
"""Search contacts by name or username."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Search in chats where type is 'user'
cursor.execute("""
SELECT id, username, title
FROM chats
WHERE type = 'user' AND (title LIKE ? OR username LIKE ?)
ORDER BY title
LIMIT 50
""", (f"%{query}%", f"%{query}%"))
contacts = cursor.fetchall()
result = []
for contact_data in contacts:
contact = Contact(
id=contact_data[0],
username=contact_data[1],
name=contact_data[2]
)
result.append(contact)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def list_messages(
date_range: Optional[Tuple[datetime, datetime]] = None,
sender_id: Optional[int] = None,
chat_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_context: bool = True,
context_before: int = 1,
context_after: int = 1
) -> List[Message]:
"""Get messages matching the specified criteria with optional context."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["""
SELECT
m.id,
m.chat_id,
c.title,
m.sender_name,
m.content,
m.timestamp,
m.is_from_me,
m.sender_id
FROM messages m
"""]
query_parts.append("JOIN chats c ON m.chat_id = c.id")
where_clauses = []
params = []
# Add filters
if date_range:
where_clauses.append("m.timestamp BETWEEN ? AND ?")
params.extend([date_range[0].isoformat(), date_range[1].isoformat()])
if sender_id:
where_clauses.append("m.sender_id = ?")
params.append(sender_id)
if chat_id:
where_clauses.append("m.chat_id = ?")
params.append(chat_id)
if query:
where_clauses.append("LOWER(m.content) LIKE LOWER(?)")
params.append(f"%{query}%")
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add pagination
offset = page * limit
query_parts.append("ORDER BY m.timestamp DESC")
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
messages = cursor.fetchall()
result = []
for msg in messages:
message = Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
)
result.append(message)
if include_context and result:
# Add context for each message
messages_with_context = []
for msg in result:
context = get_message_context(msg.id, msg.chat_id, context_before, context_after)
messages_with_context.extend(context.before)
messages_with_context.append(context.message)
messages_with_context.extend(context.after)
return messages_with_context
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_message_context(
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> MessageContext:
"""Get context around a specific message."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Get the target message first
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.id = ? AND m.chat_id = ?
""", (message_id, chat_id))
msg_data = cursor.fetchone()
if not msg_data:
raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found")
target_message = Message(
id=msg_data[0],
chat_id=msg_data[1],
chat_title=msg_data[2],
sender_name=msg_data[3],
content=msg_data[4],
timestamp=datetime.fromisoformat(msg_data[5]),
is_from_me=bool(msg_data[6]),
sender_id=msg_data[7]
)
# Get messages before
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.chat_id = ? AND m.timestamp < ?
ORDER BY m.timestamp DESC
LIMIT ?
""", (chat_id, target_message.timestamp.isoformat(), before))
before_messages = []
for msg in cursor.fetchall():
before_messages.append(Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
))
# Get messages after
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.chat_id = ? AND m.timestamp > ?
ORDER BY m.timestamp ASC
LIMIT ?
""", (chat_id, target_message.timestamp.isoformat(), after))
after_messages = []
for msg in cursor.fetchall():
after_messages.append(Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
))
return MessageContext(
message=target_message,
before=before_messages,
after=after_messages
)
except sqlite3.Error as e:
print(f"Database error: {e}")
raise
finally:
if 'conn' in locals():
conn.close()
def list_chats(
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_active"
) -> List[Chat]:
"""Get chats matching the specified criteria."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"]
where_clauses = []
params = []
if query:
where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(username) LIKE LOWER(?))")
params.extend([f"%{query}%", f"%{query}%"])
if chat_type:
where_clauses.append("type = ?")
params.append(chat_type)
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add sorting
order_by = "last_message_time DESC" if sort_by == "last_active" else "title"
query_parts.append(f"ORDER BY {order_by}")
# Add pagination
offset = (page) * limit
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
chats = cursor.fetchall()
result = []
for chat_data in chats:
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
chat = Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_chat(chat_id: int) -> Optional[Chat]:
"""Get chat metadata by ID."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, username, type, last_message_time
FROM chats
WHERE id = ?
""", (chat_id,))
chat_data = cursor.fetchone()
if not chat_data:
return None
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
return Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]:
"""Get direct chat metadata by contact ID."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, username, type, last_message_time
FROM chats
WHERE id = ? AND type = 'user'
""", (contact_id,))
chat_data = cursor.fetchone()
if not chat_data:
return None
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
return Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]:
"""Get all chats involving the contact.
Args:
contact_id: The contact's ID to search for
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
"""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
c.id, c.title, c.username, c.type, c.last_message_time
FROM chats c
JOIN messages m ON c.id = m.chat_id
WHERE m.sender_id = ? OR c.id = ?
ORDER BY c.last_message_time DESC
LIMIT ? OFFSET ?
""", (contact_id, contact_id, limit, page * limit))
chats = cursor.fetchall()
result = []
for chat_data in chats:
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
chat = Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_last_interaction(contact_id: int) -> Optional[Message]:
"""Get most recent message involving the contact."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT
m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.sender_id = ? OR c.id = ?
ORDER BY m.timestamp DESC
LIMIT 1
""", (contact_id, contact_id))
msg_data = cursor.fetchone()
if not msg_data:
return None
return Message(
id=msg_data[0],
chat_id=msg_data[1],
chat_title=msg_data[2],
sender_name=msg_data[3],
content=msg_data[4],
timestamp=datetime.fromisoformat(msg_data[5]),
is_from_me=bool(msg_data[6]),
sender_id=msg_data[7]
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def send_message(recipient: str, message: str) -> Tuple[bool, str]:
"""Send a Telegram message to the specified recipient.
Args:
recipient: The recipient - either a username (with or without @),
or a chat ID as a string or integer
message: The message text to send
Returns:
Tuple[bool, str]: A tuple containing success status and a status message
"""
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
url = f"{TELEGRAM_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"message": message
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: Unknown"
except Exception as e:
return False, f"Unexpected error: {str(e)}"