telegram-mcp/telegram-bridge/api/middleware.py
2025-04-05 16:13:36 +08:00

155 lines
No EOL
4.7 KiB
Python

"""
Telegram API middleware.
Handles common operations between the application and Telegram API such as
authentication, error handling, and entity type conversion.
"""
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable
from datetime import datetime
from functools import wraps
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
from api.client import TelegramApiClient
logger = logging.getLogger(__name__)
def handle_telegram_errors(func):
"""Decorator to handle Telegram API errors."""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Telegram API error in {func.__name__}: {e}")
return None
return wrapper
class TelegramMiddleware:
"""Middleware for Telegram API operations."""
def __init__(self, client: TelegramApiClient):
"""Initialize the middleware with a Telegram client.
Args:
client: Initialized Telegram API client
"""
self.client = client
async def process_chat_entity(self, entity: Any) -> Dict[str, Any]:
"""Process a chat entity and convert it to a dictionary.
Args:
entity: Chat entity from Telegram API
Returns:
Dict: Standardized chat representation
"""
if isinstance(entity, User):
chat_type = "user"
title = get_display_name(entity)
username = entity.username
elif isinstance(entity, Chat):
chat_type = "group"
title = entity.title
username = None
elif isinstance(entity, Channel):
chat_type = "channel" if entity.broadcast else "supergroup"
title = entity.title
username = entity.username
else:
logger.warning(f"Unknown chat type: {type(entity)}")
return {}
return {
"id": entity.id,
"title": title,
"username": username,
"type": chat_type
}
@handle_telegram_errors
async def process_dialog(self, dialog: Dialog) -> Dict[str, Any]:
"""Process a dialog and convert it to a dictionary.
Args:
dialog: Dialog from Telegram API
Returns:
Dict: Standardized dialog representation
"""
chat_info = await self.process_chat_entity(dialog.entity)
chat_info["last_message_time"] = dialog.date
return chat_info
@handle_telegram_errors
async def process_message(self, message: Message) -> Optional[Dict[str, Any]]:
"""Process a message and convert it to a dictionary.
Args:
message: Message from Telegram API
Returns:
Dict: Standardized message representation
"""
if not message.text:
return None # Skip non-text messages
# Get the chat
chat = message.chat
if not chat:
return None
chat_info = await self.process_chat_entity(chat)
# Get sender information
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
# Check if the message is from the current user
my_id = (await self.client.get_me()).id
is_from_me = sender_id == my_id
return {
"id": message.id,
"chat_id": chat_info["id"],
"chat_title": chat_info["title"],
"sender_id": sender_id,
"sender_name": sender_name,
"content": message.text,
"timestamp": message.date,
"is_from_me": is_from_me
}
@handle_telegram_errors
async def find_entity_by_name_or_id(self, recipient: str) -> Optional[Any]:
"""Find an entity by name or ID.
Args:
recipient: Recipient identifier (ID, username, or title)
Returns:
Any: Found entity or None
"""
# Try to parse as an integer (chat ID)
try:
chat_id = int(recipient)
return await self.client.get_entity(chat_id)
except ValueError:
pass
# Not an integer, try as username
if recipient.startswith("@"):
recipient = recipient[1:] # Remove @ if present
try:
return await self.client.get_entity(recipient)
except Exception:
logger.error(f"Could not find entity: {recipient}")
return None