telegram-mcp/telegram-bridge/api/client.py
2025-04-05 16:13:36 +08:00

190 lines
No EOL
5.8 KiB
Python

"""
Telegram API client.
Handles communication with the Telegram API using the Telethon library.
"""
import os
import logging
import asyncio
from typing import Optional, List, Dict, Any, Callable, Tuple
from datetime import datetime
from telethon import TelegramClient
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
logger = logging.getLogger(__name__)
class TelegramApiClient:
"""Client for interacting with the Telegram API."""
def __init__(self, session_file: str, api_id: str, api_hash: str):
"""Initialize the Telegram API client.
Args:
session_file: Path to the session file for authentication
api_id: Telegram API ID
api_hash: Telegram API hash
"""
self.session_file = session_file
self.api_id = api_id
self.api_hash = api_hash
self.client = TelegramClient(session_file, api_id, api_hash)
self._me = None
async def connect(self) -> bool:
"""Connect to the Telegram API.
Returns:
bool: True if successfully connected, False otherwise
"""
try:
await self.client.connect()
return True
except Exception as e:
logger.error(f"Failed to connect to Telegram: {e}")
return False
async def is_authorized(self) -> bool:
"""Check if the client is authorized.
Returns:
bool: True if authorized, False otherwise
"""
return await self.client.is_user_authorized()
async def send_code_request(self, phone: str) -> bool:
"""Send a code request to the given phone number.
Args:
phone: Phone number to send code to
Returns:
bool: True if code sent successfully, False otherwise
"""
try:
await self.client.send_code_request(phone)
return True
except Exception as e:
logger.error(f"Failed to send code request: {e}")
return False
async def sign_in(self, phone: Optional[str] = None, code: Optional[str] = None,
password: Optional[str] = None) -> bool:
"""Sign in to Telegram.
Args:
phone: Phone number (optional)
code: Verification code (optional)
password: Two-factor authentication password (optional)
Returns:
bool: True if signed in successfully, False otherwise
"""
try:
if password:
await self.client.sign_in(password=password)
else:
await self.client.sign_in(phone, code)
return True
except Exception as e:
logger.error(f"Failed to sign in: {e}")
return False
async def get_me(self) -> Optional[User]:
"""Get the current user.
Returns:
User: Current user object
"""
if not self._me:
self._me = await self.client.get_me()
return self._me
async def get_dialogs(self, limit: int = 100) -> List[Dialog]:
"""Get dialogs (chats) from Telegram.
Args:
limit: Maximum number of dialogs to retrieve
Returns:
List[Dialog]: List of dialogs
"""
try:
return await self.client.get_dialogs(limit=limit)
except Exception as e:
logger.error(f"Failed to get dialogs: {e}")
return []
async def get_entity(self, entity_id: Any) -> Optional[Any]:
"""Get an entity from Telegram.
Args:
entity_id: ID of the entity to retrieve
Returns:
Any: Entity object (User, Chat, or Channel)
"""
try:
return await self.client.get_entity(entity_id)
except Exception as e:
logger.error(f"Failed to get entity: {e}")
return None
async def get_messages(self, entity: Any, limit: int = 100) -> List[Message]:
"""Get messages from a chat.
Args:
entity: Chat entity
limit: Maximum number of messages to retrieve
Returns:
List[Message]: List of messages
"""
try:
return await self.client.get_messages(entity, limit=limit)
except Exception as e:
logger.error(f"Failed to get messages: {e}")
return []
async def send_message(self, entity: Any, message: str) -> Optional[Message]:
"""Send a message to a chat.
Args:
entity: Chat entity
message: Message text to send
Returns:
Message: Sent message object
"""
try:
return await self.client.send_message(entity, message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
return None
def add_event_handler(self, callback: Callable, event: Any) -> None:
"""Add an event handler.
Args:
callback: Callback function to handle the event
event: Event to handle
"""
self.client.add_event_handler(callback, event)
async def disconnect(self) -> None:
"""Disconnect from the Telegram API."""
await self.client.disconnect()
def __del__(self):
"""Cleanup when the client is deleted."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.disconnect())
else:
loop.run_until_complete(self.disconnect())
except:
pass