From 922d226719e74725318469ab1ff51c4902a18392 Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:13:36 +0800 Subject: [PATCH] api for telegram interaction --- telegram-bridge/api/__init__.py | 5 + telegram-bridge/api/client.py | 190 ++++++++++++++++++++++++++++++ telegram-bridge/api/middleware.py | 155 ++++++++++++++++++++++++ telegram-bridge/api/models.py | 49 ++++++++ 4 files changed, 399 insertions(+) create mode 100644 telegram-bridge/api/__init__.py create mode 100644 telegram-bridge/api/client.py create mode 100644 telegram-bridge/api/middleware.py create mode 100644 telegram-bridge/api/models.py diff --git a/telegram-bridge/api/__init__.py b/telegram-bridge/api/__init__.py new file mode 100644 index 0000000..0bb9df1 --- /dev/null +++ b/telegram-bridge/api/__init__.py @@ -0,0 +1,5 @@ +""" +API module for Telegram interaction. + +Provides client, middleware, and models for interacting with the Telegram API. +""" \ No newline at end of file diff --git a/telegram-bridge/api/client.py b/telegram-bridge/api/client.py new file mode 100644 index 0000000..5843a5c --- /dev/null +++ b/telegram-bridge/api/client.py @@ -0,0 +1,190 @@ +""" +Telegram API client. + +Handles communication with the Telegram API using the Telethon library. +""" + +import os +import logging +import asyncio +from typing import Optional, List, Dict, Any, Callable, Tuple +from datetime import datetime + +from telethon import TelegramClient +from telethon.tl.types import User, Chat, Channel, Message, Dialog +from telethon.utils import get_display_name + +logger = logging.getLogger(__name__) + + +class TelegramApiClient: + """Client for interacting with the Telegram API.""" + + def __init__(self, session_file: str, api_id: str, api_hash: str): + """Initialize the Telegram API client. + + Args: + session_file: Path to the session file for authentication + api_id: Telegram API ID + api_hash: Telegram API hash + """ + self.session_file = session_file + self.api_id = api_id + self.api_hash = api_hash + self.client = TelegramClient(session_file, api_id, api_hash) + self._me = None + + async def connect(self) -> bool: + """Connect to the Telegram API. + + Returns: + bool: True if successfully connected, False otherwise + """ + try: + await self.client.connect() + return True + except Exception as e: + logger.error(f"Failed to connect to Telegram: {e}") + return False + + async def is_authorized(self) -> bool: + """Check if the client is authorized. + + Returns: + bool: True if authorized, False otherwise + """ + return await self.client.is_user_authorized() + + async def send_code_request(self, phone: str) -> bool: + """Send a code request to the given phone number. + + Args: + phone: Phone number to send code to + + Returns: + bool: True if code sent successfully, False otherwise + """ + try: + await self.client.send_code_request(phone) + return True + except Exception as e: + logger.error(f"Failed to send code request: {e}") + return False + + async def sign_in(self, phone: Optional[str] = None, code: Optional[str] = None, + password: Optional[str] = None) -> bool: + """Sign in to Telegram. + + Args: + phone: Phone number (optional) + code: Verification code (optional) + password: Two-factor authentication password (optional) + + Returns: + bool: True if signed in successfully, False otherwise + """ + try: + if password: + await self.client.sign_in(password=password) + else: + await self.client.sign_in(phone, code) + return True + except Exception as e: + logger.error(f"Failed to sign in: {e}") + return False + + async def get_me(self) -> Optional[User]: + """Get the current user. + + Returns: + User: Current user object + """ + if not self._me: + self._me = await self.client.get_me() + return self._me + + async def get_dialogs(self, limit: int = 100) -> List[Dialog]: + """Get dialogs (chats) from Telegram. + + Args: + limit: Maximum number of dialogs to retrieve + + Returns: + List[Dialog]: List of dialogs + """ + try: + return await self.client.get_dialogs(limit=limit) + except Exception as e: + logger.error(f"Failed to get dialogs: {e}") + return [] + + async def get_entity(self, entity_id: Any) -> Optional[Any]: + """Get an entity from Telegram. + + Args: + entity_id: ID of the entity to retrieve + + Returns: + Any: Entity object (User, Chat, or Channel) + """ + try: + return await self.client.get_entity(entity_id) + except Exception as e: + logger.error(f"Failed to get entity: {e}") + return None + + async def get_messages(self, entity: Any, limit: int = 100) -> List[Message]: + """Get messages from a chat. + + Args: + entity: Chat entity + limit: Maximum number of messages to retrieve + + Returns: + List[Message]: List of messages + """ + try: + return await self.client.get_messages(entity, limit=limit) + except Exception as e: + logger.error(f"Failed to get messages: {e}") + return [] + + async def send_message(self, entity: Any, message: str) -> Optional[Message]: + """Send a message to a chat. + + Args: + entity: Chat entity + message: Message text to send + + Returns: + Message: Sent message object + """ + try: + return await self.client.send_message(entity, message) + except Exception as e: + logger.error(f"Failed to send message: {e}") + return None + + def add_event_handler(self, callback: Callable, event: Any) -> None: + """Add an event handler. + + Args: + callback: Callback function to handle the event + event: Event to handle + """ + self.client.add_event_handler(callback, event) + + async def disconnect(self) -> None: + """Disconnect from the Telegram API.""" + await self.client.disconnect() + + def __del__(self): + """Cleanup when the client is deleted.""" + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self.disconnect()) + else: + loop.run_until_complete(self.disconnect()) + except: + pass \ No newline at end of file diff --git a/telegram-bridge/api/middleware.py b/telegram-bridge/api/middleware.py new file mode 100644 index 0000000..5231cc9 --- /dev/null +++ b/telegram-bridge/api/middleware.py @@ -0,0 +1,155 @@ +""" +Telegram API middleware. + +Handles common operations between the application and Telegram API such as +authentication, error handling, and entity type conversion. +""" + +import logging +from typing import Dict, Any, Optional, Tuple, List, Callable +from datetime import datetime +from functools import wraps + +from telethon.tl.types import User, Chat, Channel, Message, Dialog +from telethon.utils import get_display_name + +from api.client import TelegramApiClient + +logger = logging.getLogger(__name__) + + +def handle_telegram_errors(func): + """Decorator to handle Telegram API errors.""" + @wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error(f"Telegram API error in {func.__name__}: {e}") + return None + return wrapper + + +class TelegramMiddleware: + """Middleware for Telegram API operations.""" + + def __init__(self, client: TelegramApiClient): + """Initialize the middleware with a Telegram client. + + Args: + client: Initialized Telegram API client + """ + self.client = client + + async def process_chat_entity(self, entity: Any) -> Dict[str, Any]: + """Process a chat entity and convert it to a dictionary. + + Args: + entity: Chat entity from Telegram API + + Returns: + Dict: Standardized chat representation + """ + if isinstance(entity, User): + chat_type = "user" + title = get_display_name(entity) + username = entity.username + elif isinstance(entity, Chat): + chat_type = "group" + title = entity.title + username = None + elif isinstance(entity, Channel): + chat_type = "channel" if entity.broadcast else "supergroup" + title = entity.title + username = entity.username + else: + logger.warning(f"Unknown chat type: {type(entity)}") + return {} + + return { + "id": entity.id, + "title": title, + "username": username, + "type": chat_type + } + + @handle_telegram_errors + async def process_dialog(self, dialog: Dialog) -> Dict[str, Any]: + """Process a dialog and convert it to a dictionary. + + Args: + dialog: Dialog from Telegram API + + Returns: + Dict: Standardized dialog representation + """ + chat_info = await self.process_chat_entity(dialog.entity) + chat_info["last_message_time"] = dialog.date + return chat_info + + @handle_telegram_errors + async def process_message(self, message: Message) -> Optional[Dict[str, Any]]: + """Process a message and convert it to a dictionary. + + Args: + message: Message from Telegram API + + Returns: + Dict: Standardized message representation + """ + if not message.text: + return None # Skip non-text messages + + # Get the chat + chat = message.chat + if not chat: + return None + + chat_info = await self.process_chat_entity(chat) + + # Get sender information + sender = await message.get_sender() + sender_id = sender.id if sender else 0 + sender_name = get_display_name(sender) if sender else "Unknown" + + # Check if the message is from the current user + my_id = (await self.client.get_me()).id + is_from_me = sender_id == my_id + + return { + "id": message.id, + "chat_id": chat_info["id"], + "chat_title": chat_info["title"], + "sender_id": sender_id, + "sender_name": sender_name, + "content": message.text, + "timestamp": message.date, + "is_from_me": is_from_me + } + + @handle_telegram_errors + async def find_entity_by_name_or_id(self, recipient: str) -> Optional[Any]: + """Find an entity by name or ID. + + Args: + recipient: Recipient identifier (ID, username, or title) + + Returns: + Any: Found entity or None + """ + # Try to parse as an integer (chat ID) + try: + chat_id = int(recipient) + return await self.client.get_entity(chat_id) + except ValueError: + pass + + # Not an integer, try as username + if recipient.startswith("@"): + recipient = recipient[1:] # Remove @ if present + + try: + return await self.client.get_entity(recipient) + except Exception: + logger.error(f"Could not find entity: {recipient}") + return None \ No newline at end of file diff --git a/telegram-bridge/api/models.py b/telegram-bridge/api/models.py new file mode 100644 index 0000000..50e565d --- /dev/null +++ b/telegram-bridge/api/models.py @@ -0,0 +1,49 @@ +""" +API models for data transfer. + +Defines Pydantic models for request and response data validation. +""" + +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, Field + + +class ChatModel(BaseModel): + """Model representing a Telegram chat.""" + id: int + title: str + username: Optional[str] = None + type: str + last_message_time: Optional[datetime] = None + + +class MessageModel(BaseModel): + """Model representing a Telegram message.""" + id: int + chat_id: int + chat_title: str + sender_id: int + sender_name: str + content: str + timestamp: datetime + is_from_me: bool = False + + +class MessageContextModel(BaseModel): + """Model representing a message with its context.""" + message: MessageModel + before: List[MessageModel] = [] + after: List[MessageModel] = [] + + +class SendMessageRequest(BaseModel): + """Model for sending message requests.""" + recipient: str + message: str + + +class SendMessageResponse(BaseModel): + """Model for sending message responses.""" + success: bool + message: str \ No newline at end of file