telegram-mcp/telegram-bridge/service.py
2025-04-05 16:08:31 +08:00

224 lines
No EOL
8.1 KiB
Python

"""
Service layer for the Telegram bridge.
Connects the API middleware with database repositories to provide
high-level operations for the application.
"""
import logging
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from telethon import events
from api import TelegramApiClient, TelegramMiddleware
from database import ChatRepository, MessageRepository
logger = logging.getLogger(__name__)
class TelegramService:
"""Service for Telegram operations."""
def __init__(
self,
telegram_client: TelegramApiClient,
middleware: TelegramMiddleware,
chat_repo: ChatRepository,
message_repo: MessageRepository
):
"""Initialize the service.
Args:
telegram_client: Telegram API client
middleware: Telegram middleware
chat_repo: Chat repository
message_repo: Message repository
"""
self.client = telegram_client
self.middleware = middleware
self.chat_repo = chat_repo
self.message_repo = message_repo
async def setup(self) -> None:
"""Set up the service, connect to Telegram, and register handlers."""
# Connect to Telegram
await self.client.connect()
# Register event handlers
self.client.add_event_handler(self._handle_new_message, events.NewMessage)
async def authorize(self) -> bool:
"""Authorize with Telegram if needed."""
if await self.client.is_authorized():
logger.info("Already authorized with Telegram")
return True
logger.info("Not authorized with Telegram. Interactive login required.")
return False
async def login(self, phone: str, code: str, password: Optional[str] = None) -> bool:
"""Login to Telegram.
Args:
phone: Phone number
code: Verification code
password: Two-factor authentication password (optional)
Returns:
bool: True if login successful, False otherwise
"""
if password:
return await self.client.sign_in(phone=phone, code=code, password=password)
else:
# First send code request
await self.client.send_code_request(phone)
# Then sign in with the code
return await self.client.sign_in(phone=phone, code=code)
async def sync_all_dialogs(self, limit: int = 100) -> None:
"""Sync all dialogs (chats) from Telegram.
Args:
limit: Maximum number of dialogs to retrieve
"""
logger.info("Starting synchronization of all dialogs")
# Get all dialogs (chats)
dialogs = await self.client.get_dialogs(limit=limit)
for dialog in dialogs:
try:
await self.sync_dialog_history(dialog)
except Exception as e:
logger.error(f"Error syncing dialog {dialog.name}: {e}")
logger.info(f"Completed synchronization of {len(dialogs)} dialogs")
async def sync_dialog_history(self, dialog, limit: int = 100) -> None:
"""Sync message history for a specific dialog.
Args:
dialog: Dialog to sync
limit: Maximum number of messages to retrieve
"""
# Process dialog entity
chat_info = await self.middleware.process_dialog(dialog)
if not chat_info:
logger.warning(f"Could not process dialog: {dialog}")
return
# Store chat information
self.chat_repo.store_chat(
chat_id=chat_info["id"],
title=chat_info["title"],
username=chat_info.get("username"),
chat_type=chat_info["type"],
last_message_time=chat_info["last_message_time"]
)
# Get messages
messages = await self.client.get_messages(dialog.entity, limit=limit)
# Process each message
for message in messages:
msg_info = await self.middleware.process_message(message)
if msg_info:
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
logger.info(f"Synced {len(messages)} messages from {chat_info['title']}")
async def send_message(self, recipient: str, message: str) -> Tuple[bool, str]:
"""Send a message to a Telegram recipient.
Args:
recipient: Recipient identifier (ID, username, or title)
message: Message text to send
Returns:
Tuple[bool, str]: Success status and message
"""
if not self.client.client.is_connected():
return False, "Not connected to Telegram"
entity = await self.middleware.find_entity_by_name_or_id(recipient)
if not entity:
# Try to find in database
try:
# Try to parse as integer
chat_id = int(recipient)
chat = self.chat_repo.get_chat_by_id(chat_id)
if chat:
entity = await self.client.get_entity(chat_id)
except ValueError:
# Not an integer, try to find by name
chats = self.chat_repo.get_chats(query=recipient, limit=1)
if chats:
entity = await self.client.get_entity(chats[0].id)
if not entity:
return False, f"Recipient not found: {recipient}"
# Send the message
sent_message = await self.client.send_message(entity, message)
if sent_message:
# Process and store the sent message
msg_info = await self.middleware.process_message(sent_message)
if msg_info:
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
return True, f"Message sent to {recipient}"
else:
return False, f"Failed to send message to {recipient}"
async def _handle_new_message(self, event) -> None:
"""Handle a new message event from Telegram."""
message = event.message
msg_info = await self.middleware.process_message(message)
if msg_info:
# Process and store chat information
chat_entity = message.chat
if chat_entity:
chat_info = await self.middleware.process_chat_entity(chat_entity)
self.chat_repo.store_chat(
chat_id=chat_info["id"],
title=chat_info["title"],
username=chat_info.get("username"),
chat_type=chat_info["type"],
last_message_time=message.date
)
# Store the message
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
logger.info(
f"Stored message: [{msg_info['timestamp']}] {msg_info['sender_name']} "
f"in {msg_info['chat_title']}: {msg_info['content'][:30]}..."
)