diff --git a/telegram-bridge/service.py b/telegram-bridge/service.py new file mode 100644 index 0000000..c203bae --- /dev/null +++ b/telegram-bridge/service.py @@ -0,0 +1,224 @@ +""" +Service layer for the Telegram bridge. + +Connects the API middleware with database repositories to provide +high-level operations for the application. +""" + +import logging +from typing import Optional, List, Dict, Any, Tuple +from datetime import datetime + +from telethon import events + +from api import TelegramApiClient, TelegramMiddleware +from database import ChatRepository, MessageRepository + +logger = logging.getLogger(__name__) + + +class TelegramService: + """Service for Telegram operations.""" + + def __init__( + self, + telegram_client: TelegramApiClient, + middleware: TelegramMiddleware, + chat_repo: ChatRepository, + message_repo: MessageRepository + ): + """Initialize the service. + + Args: + telegram_client: Telegram API client + middleware: Telegram middleware + chat_repo: Chat repository + message_repo: Message repository + """ + self.client = telegram_client + self.middleware = middleware + self.chat_repo = chat_repo + self.message_repo = message_repo + + async def setup(self) -> None: + """Set up the service, connect to Telegram, and register handlers.""" + # Connect to Telegram + await self.client.connect() + + # Register event handlers + self.client.add_event_handler(self._handle_new_message, events.NewMessage) + + async def authorize(self) -> bool: + """Authorize with Telegram if needed.""" + if await self.client.is_authorized(): + logger.info("Already authorized with Telegram") + return True + + logger.info("Not authorized with Telegram. Interactive login required.") + return False + + async def login(self, phone: str, code: str, password: Optional[str] = None) -> bool: + """Login to Telegram. + + Args: + phone: Phone number + code: Verification code + password: Two-factor authentication password (optional) + + Returns: + bool: True if login successful, False otherwise + """ + if password: + return await self.client.sign_in(phone=phone, code=code, password=password) + else: + # First send code request + await self.client.send_code_request(phone) + # Then sign in with the code + return await self.client.sign_in(phone=phone, code=code) + + async def sync_all_dialogs(self, limit: int = 100) -> None: + """Sync all dialogs (chats) from Telegram. + + Args: + limit: Maximum number of dialogs to retrieve + """ + logger.info("Starting synchronization of all dialogs") + + # Get all dialogs (chats) + dialogs = await self.client.get_dialogs(limit=limit) + + for dialog in dialogs: + try: + await self.sync_dialog_history(dialog) + except Exception as e: + logger.error(f"Error syncing dialog {dialog.name}: {e}") + + logger.info(f"Completed synchronization of {len(dialogs)} dialogs") + + async def sync_dialog_history(self, dialog, limit: int = 100) -> None: + """Sync message history for a specific dialog. + + Args: + dialog: Dialog to sync + limit: Maximum number of messages to retrieve + """ + # Process dialog entity + chat_info = await self.middleware.process_dialog(dialog) + + if not chat_info: + logger.warning(f"Could not process dialog: {dialog}") + return + + # Store chat information + self.chat_repo.store_chat( + chat_id=chat_info["id"], + title=chat_info["title"], + username=chat_info.get("username"), + chat_type=chat_info["type"], + last_message_time=chat_info["last_message_time"] + ) + + # Get messages + messages = await self.client.get_messages(dialog.entity, limit=limit) + + # Process each message + for message in messages: + msg_info = await self.middleware.process_message(message) + if msg_info: + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + + logger.info(f"Synced {len(messages)} messages from {chat_info['title']}") + + async def send_message(self, recipient: str, message: str) -> Tuple[bool, str]: + """Send a message to a Telegram recipient. + + Args: + recipient: Recipient identifier (ID, username, or title) + message: Message text to send + + Returns: + Tuple[bool, str]: Success status and message + """ + if not self.client.client.is_connected(): + return False, "Not connected to Telegram" + + entity = await self.middleware.find_entity_by_name_or_id(recipient) + + if not entity: + # Try to find in database + try: + # Try to parse as integer + chat_id = int(recipient) + chat = self.chat_repo.get_chat_by_id(chat_id) + if chat: + entity = await self.client.get_entity(chat_id) + except ValueError: + # Not an integer, try to find by name + chats = self.chat_repo.get_chats(query=recipient, limit=1) + if chats: + entity = await self.client.get_entity(chats[0].id) + + if not entity: + return False, f"Recipient not found: {recipient}" + + # Send the message + sent_message = await self.client.send_message(entity, message) + + if sent_message: + # Process and store the sent message + msg_info = await self.middleware.process_message(sent_message) + if msg_info: + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + return True, f"Message sent to {recipient}" + else: + return False, f"Failed to send message to {recipient}" + + async def _handle_new_message(self, event) -> None: + """Handle a new message event from Telegram.""" + message = event.message + msg_info = await self.middleware.process_message(message) + + if msg_info: + # Process and store chat information + chat_entity = message.chat + if chat_entity: + chat_info = await self.middleware.process_chat_entity(chat_entity) + self.chat_repo.store_chat( + chat_id=chat_info["id"], + title=chat_info["title"], + username=chat_info.get("username"), + chat_type=chat_info["type"], + last_message_time=message.date + ) + + # Store the message + self.message_repo.store_message( + message_id=msg_info["id"], + chat_id=msg_info["chat_id"], + sender_id=msg_info["sender_id"], + sender_name=msg_info["sender_name"], + content=msg_info["content"], + timestamp=msg_info["timestamp"], + is_from_me=msg_info["is_from_me"] + ) + + logger.info( + f"Stored message: [{msg_info['timestamp']}] {msg_info['sender_name']} " + f"in {msg_info['chat_title']}: {msg_info['content'][:30]}..." + ) \ No newline at end of file