api for telegram interaction

This commit is contained in:
Muhammad18557 2025-04-05 16:13:36 +08:00
parent dd9015bc24
commit 922d226719
4 changed files with 399 additions and 0 deletions

View file

@ -0,0 +1,5 @@
"""
API module for Telegram interaction.
Provides client, middleware, and models for interacting with the Telegram API.
"""

View file

@ -0,0 +1,190 @@
"""
Telegram API client.
Handles communication with the Telegram API using the Telethon library.
"""
import os
import logging
import asyncio
from typing import Optional, List, Dict, Any, Callable, Tuple
from datetime import datetime
from telethon import TelegramClient
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
logger = logging.getLogger(__name__)
class TelegramApiClient:
"""Client for interacting with the Telegram API."""
def __init__(self, session_file: str, api_id: str, api_hash: str):
"""Initialize the Telegram API client.
Args:
session_file: Path to the session file for authentication
api_id: Telegram API ID
api_hash: Telegram API hash
"""
self.session_file = session_file
self.api_id = api_id
self.api_hash = api_hash
self.client = TelegramClient(session_file, api_id, api_hash)
self._me = None
async def connect(self) -> bool:
"""Connect to the Telegram API.
Returns:
bool: True if successfully connected, False otherwise
"""
try:
await self.client.connect()
return True
except Exception as e:
logger.error(f"Failed to connect to Telegram: {e}")
return False
async def is_authorized(self) -> bool:
"""Check if the client is authorized.
Returns:
bool: True if authorized, False otherwise
"""
return await self.client.is_user_authorized()
async def send_code_request(self, phone: str) -> bool:
"""Send a code request to the given phone number.
Args:
phone: Phone number to send code to
Returns:
bool: True if code sent successfully, False otherwise
"""
try:
await self.client.send_code_request(phone)
return True
except Exception as e:
logger.error(f"Failed to send code request: {e}")
return False
async def sign_in(self, phone: Optional[str] = None, code: Optional[str] = None,
password: Optional[str] = None) -> bool:
"""Sign in to Telegram.
Args:
phone: Phone number (optional)
code: Verification code (optional)
password: Two-factor authentication password (optional)
Returns:
bool: True if signed in successfully, False otherwise
"""
try:
if password:
await self.client.sign_in(password=password)
else:
await self.client.sign_in(phone, code)
return True
except Exception as e:
logger.error(f"Failed to sign in: {e}")
return False
async def get_me(self) -> Optional[User]:
"""Get the current user.
Returns:
User: Current user object
"""
if not self._me:
self._me = await self.client.get_me()
return self._me
async def get_dialogs(self, limit: int = 100) -> List[Dialog]:
"""Get dialogs (chats) from Telegram.
Args:
limit: Maximum number of dialogs to retrieve
Returns:
List[Dialog]: List of dialogs
"""
try:
return await self.client.get_dialogs(limit=limit)
except Exception as e:
logger.error(f"Failed to get dialogs: {e}")
return []
async def get_entity(self, entity_id: Any) -> Optional[Any]:
"""Get an entity from Telegram.
Args:
entity_id: ID of the entity to retrieve
Returns:
Any: Entity object (User, Chat, or Channel)
"""
try:
return await self.client.get_entity(entity_id)
except Exception as e:
logger.error(f"Failed to get entity: {e}")
return None
async def get_messages(self, entity: Any, limit: int = 100) -> List[Message]:
"""Get messages from a chat.
Args:
entity: Chat entity
limit: Maximum number of messages to retrieve
Returns:
List[Message]: List of messages
"""
try:
return await self.client.get_messages(entity, limit=limit)
except Exception as e:
logger.error(f"Failed to get messages: {e}")
return []
async def send_message(self, entity: Any, message: str) -> Optional[Message]:
"""Send a message to a chat.
Args:
entity: Chat entity
message: Message text to send
Returns:
Message: Sent message object
"""
try:
return await self.client.send_message(entity, message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
return None
def add_event_handler(self, callback: Callable, event: Any) -> None:
"""Add an event handler.
Args:
callback: Callback function to handle the event
event: Event to handle
"""
self.client.add_event_handler(callback, event)
async def disconnect(self) -> None:
"""Disconnect from the Telegram API."""
await self.client.disconnect()
def __del__(self):
"""Cleanup when the client is deleted."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.disconnect())
else:
loop.run_until_complete(self.disconnect())
except:
pass

View file

@ -0,0 +1,155 @@
"""
Telegram API middleware.
Handles common operations between the application and Telegram API such as
authentication, error handling, and entity type conversion.
"""
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable
from datetime import datetime
from functools import wraps
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
from api.client import TelegramApiClient
logger = logging.getLogger(__name__)
def handle_telegram_errors(func):
"""Decorator to handle Telegram API errors."""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Telegram API error in {func.__name__}: {e}")
return None
return wrapper
class TelegramMiddleware:
"""Middleware for Telegram API operations."""
def __init__(self, client: TelegramApiClient):
"""Initialize the middleware with a Telegram client.
Args:
client: Initialized Telegram API client
"""
self.client = client
async def process_chat_entity(self, entity: Any) -> Dict[str, Any]:
"""Process a chat entity and convert it to a dictionary.
Args:
entity: Chat entity from Telegram API
Returns:
Dict: Standardized chat representation
"""
if isinstance(entity, User):
chat_type = "user"
title = get_display_name(entity)
username = entity.username
elif isinstance(entity, Chat):
chat_type = "group"
title = entity.title
username = None
elif isinstance(entity, Channel):
chat_type = "channel" if entity.broadcast else "supergroup"
title = entity.title
username = entity.username
else:
logger.warning(f"Unknown chat type: {type(entity)}")
return {}
return {
"id": entity.id,
"title": title,
"username": username,
"type": chat_type
}
@handle_telegram_errors
async def process_dialog(self, dialog: Dialog) -> Dict[str, Any]:
"""Process a dialog and convert it to a dictionary.
Args:
dialog: Dialog from Telegram API
Returns:
Dict: Standardized dialog representation
"""
chat_info = await self.process_chat_entity(dialog.entity)
chat_info["last_message_time"] = dialog.date
return chat_info
@handle_telegram_errors
async def process_message(self, message: Message) -> Optional[Dict[str, Any]]:
"""Process a message and convert it to a dictionary.
Args:
message: Message from Telegram API
Returns:
Dict: Standardized message representation
"""
if not message.text:
return None # Skip non-text messages
# Get the chat
chat = message.chat
if not chat:
return None
chat_info = await self.process_chat_entity(chat)
# Get sender information
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
# Check if the message is from the current user
my_id = (await self.client.get_me()).id
is_from_me = sender_id == my_id
return {
"id": message.id,
"chat_id": chat_info["id"],
"chat_title": chat_info["title"],
"sender_id": sender_id,
"sender_name": sender_name,
"content": message.text,
"timestamp": message.date,
"is_from_me": is_from_me
}
@handle_telegram_errors
async def find_entity_by_name_or_id(self, recipient: str) -> Optional[Any]:
"""Find an entity by name or ID.
Args:
recipient: Recipient identifier (ID, username, or title)
Returns:
Any: Found entity or None
"""
# Try to parse as an integer (chat ID)
try:
chat_id = int(recipient)
return await self.client.get_entity(chat_id)
except ValueError:
pass
# Not an integer, try as username
if recipient.startswith("@"):
recipient = recipient[1:] # Remove @ if present
try:
return await self.client.get_entity(recipient)
except Exception:
logger.error(f"Could not find entity: {recipient}")
return None

View file

@ -0,0 +1,49 @@
"""
API models for data transfer.
Defines Pydantic models for request and response data validation.
"""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field
class ChatModel(BaseModel):
"""Model representing a Telegram chat."""
id: int
title: str
username: Optional[str] = None
type: str
last_message_time: Optional[datetime] = None
class MessageModel(BaseModel):
"""Model representing a Telegram message."""
id: int
chat_id: int
chat_title: str
sender_id: int
sender_name: str
content: str
timestamp: datetime
is_from_me: bool = False
class MessageContextModel(BaseModel):
"""Model representing a message with its context."""
message: MessageModel
before: List[MessageModel] = []
after: List[MessageModel] = []
class SendMessageRequest(BaseModel):
"""Model for sending message requests."""
recipient: str
message: str
class SendMessageResponse(BaseModel):
"""Model for sending message responses."""
success: bool
message: str