Merge pull request #1 from Muhammad18557/refactor

Refactor Telegram MCP into modular package and use SQLAlchemy ORM
This commit is contained in:
Muhammad Abdullah 2025-04-05 20:02:18 +08:00 committed by GitHub
commit 7033b20a35
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1933 additions and 1198 deletions

View file

@ -3,6 +3,10 @@ telethon
cryptg
python-dotenv
requests
sqlalchemy
pydantic
fastapi
uvicorn
# MCP Server dependencies
fastmcp

View file

@ -0,0 +1,9 @@
"""
Telegram Bridge.
Provides a bridge between Telegram API and HTTP API for sending and receiving messages.
"""
from api import TelegramApiClient
from database import init_db

View file

@ -0,0 +1,15 @@
"""
API module for Telegram interaction.
Provides client, middleware, and models for interacting with the Telegram API.
"""
from api.client import TelegramApiClient
from api.middleware import TelegramMiddleware, handle_telegram_errors
from api.models import (
ChatModel,
MessageModel,
MessageContextModel,
SendMessageRequest,
SendMessageResponse
)

View file

@ -0,0 +1,190 @@
"""
Telegram API client.
Handles communication with the Telegram API using the Telethon library.
"""
import os
import logging
import asyncio
from typing import Optional, List, Dict, Any, Callable, Tuple
from datetime import datetime
from telethon import TelegramClient
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
logger = logging.getLogger(__name__)
class TelegramApiClient:
"""Client for interacting with the Telegram API."""
def __init__(self, session_file: str, api_id: str, api_hash: str):
"""Initialize the Telegram API client.
Args:
session_file: Path to the session file for authentication
api_id: Telegram API ID
api_hash: Telegram API hash
"""
self.session_file = session_file
self.api_id = api_id
self.api_hash = api_hash
self.client = TelegramClient(session_file, api_id, api_hash)
self._me = None
async def connect(self) -> bool:
"""Connect to the Telegram API.
Returns:
bool: True if successfully connected, False otherwise
"""
try:
await self.client.connect()
return True
except Exception as e:
logger.error(f"Failed to connect to Telegram: {e}")
return False
async def is_authorized(self) -> bool:
"""Check if the client is authorized.
Returns:
bool: True if authorized, False otherwise
"""
return await self.client.is_user_authorized()
async def send_code_request(self, phone: str) -> bool:
"""Send a code request to the given phone number.
Args:
phone: Phone number to send code to
Returns:
bool: True if code sent successfully, False otherwise
"""
try:
await self.client.send_code_request(phone)
return True
except Exception as e:
logger.error(f"Failed to send code request: {e}")
return False
async def sign_in(self, phone: Optional[str] = None, code: Optional[str] = None,
password: Optional[str] = None) -> bool:
"""Sign in to Telegram.
Args:
phone: Phone number (optional)
code: Verification code (optional)
password: Two-factor authentication password (optional)
Returns:
bool: True if signed in successfully, False otherwise
"""
try:
if password:
await self.client.sign_in(password=password)
else:
await self.client.sign_in(phone, code)
return True
except Exception as e:
logger.error(f"Failed to sign in: {e}")
return False
async def get_me(self) -> Optional[User]:
"""Get the current user.
Returns:
User: Current user object
"""
if not self._me:
self._me = await self.client.get_me()
return self._me
async def get_dialogs(self, limit: int = 100) -> List[Dialog]:
"""Get dialogs (chats) from Telegram.
Args:
limit: Maximum number of dialogs to retrieve
Returns:
List[Dialog]: List of dialogs
"""
try:
return await self.client.get_dialogs(limit=limit)
except Exception as e:
logger.error(f"Failed to get dialogs: {e}")
return []
async def get_entity(self, entity_id: Any) -> Optional[Any]:
"""Get an entity from Telegram.
Args:
entity_id: ID of the entity to retrieve
Returns:
Any: Entity object (User, Chat, or Channel)
"""
try:
return await self.client.get_entity(entity_id)
except Exception as e:
logger.error(f"Failed to get entity: {e}")
return None
async def get_messages(self, entity: Any, limit: int = 100) -> List[Message]:
"""Get messages from a chat.
Args:
entity: Chat entity
limit: Maximum number of messages to retrieve
Returns:
List[Message]: List of messages
"""
try:
return await self.client.get_messages(entity, limit=limit)
except Exception as e:
logger.error(f"Failed to get messages: {e}")
return []
async def send_message(self, entity: Any, message: str) -> Optional[Message]:
"""Send a message to a chat.
Args:
entity: Chat entity
message: Message text to send
Returns:
Message: Sent message object
"""
try:
return await self.client.send_message(entity, message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
return None
def add_event_handler(self, callback: Callable, event: Any) -> None:
"""Add an event handler.
Args:
callback: Callback function to handle the event
event: Event to handle
"""
self.client.add_event_handler(callback, event)
async def disconnect(self) -> None:
"""Disconnect from the Telegram API."""
await self.client.disconnect()
def __del__(self):
"""Cleanup when the client is deleted."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.disconnect())
else:
loop.run_until_complete(self.disconnect())
except:
pass

View file

@ -0,0 +1,155 @@
"""
Telegram API middleware.
Handles common operations between the application and Telegram API such as
authentication, error handling, and entity type conversion.
"""
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable
from datetime import datetime
from functools import wraps
from telethon.tl.types import User, Chat, Channel, Message, Dialog
from telethon.utils import get_display_name
from api.client import TelegramApiClient
logger = logging.getLogger(__name__)
def handle_telegram_errors(func):
"""Decorator to handle Telegram API errors."""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Telegram API error in {func.__name__}: {e}")
return None
return wrapper
class TelegramMiddleware:
"""Middleware for Telegram API operations."""
def __init__(self, client: TelegramApiClient):
"""Initialize the middleware with a Telegram client.
Args:
client: Initialized Telegram API client
"""
self.client = client
async def process_chat_entity(self, entity: Any) -> Dict[str, Any]:
"""Process a chat entity and convert it to a dictionary.
Args:
entity: Chat entity from Telegram API
Returns:
Dict: Standardized chat representation
"""
if isinstance(entity, User):
chat_type = "user"
title = get_display_name(entity)
username = entity.username
elif isinstance(entity, Chat):
chat_type = "group"
title = entity.title
username = None
elif isinstance(entity, Channel):
chat_type = "channel" if entity.broadcast else "supergroup"
title = entity.title
username = entity.username
else:
logger.warning(f"Unknown chat type: {type(entity)}")
return {}
return {
"id": entity.id,
"title": title,
"username": username,
"type": chat_type
}
@handle_telegram_errors
async def process_dialog(self, dialog: Dialog) -> Dict[str, Any]:
"""Process a dialog and convert it to a dictionary.
Args:
dialog: Dialog from Telegram API
Returns:
Dict: Standardized dialog representation
"""
chat_info = await self.process_chat_entity(dialog.entity)
chat_info["last_message_time"] = dialog.date
return chat_info
@handle_telegram_errors
async def process_message(self, message: Message) -> Optional[Dict[str, Any]]:
"""Process a message and convert it to a dictionary.
Args:
message: Message from Telegram API
Returns:
Dict: Standardized message representation
"""
if not message.text:
return None # Skip non-text messages
# Get the chat
chat = message.chat
if not chat:
return None
chat_info = await self.process_chat_entity(chat)
# Get sender information
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
# Check if the message is from the current user
my_id = (await self.client.get_me()).id
is_from_me = sender_id == my_id
return {
"id": message.id,
"chat_id": chat_info["id"],
"chat_title": chat_info["title"],
"sender_id": sender_id,
"sender_name": sender_name,
"content": message.text,
"timestamp": message.date,
"is_from_me": is_from_me
}
@handle_telegram_errors
async def find_entity_by_name_or_id(self, recipient: str) -> Optional[Any]:
"""Find an entity by name or ID.
Args:
recipient: Recipient identifier (ID, username, or title)
Returns:
Any: Found entity or None
"""
# Try to parse as an integer (chat ID)
try:
chat_id = int(recipient)
return await self.client.get_entity(chat_id)
except ValueError:
pass
# Not an integer, try as username
if recipient.startswith("@"):
recipient = recipient[1:] # Remove @ if present
try:
return await self.client.get_entity(recipient)
except Exception:
logger.error(f"Could not find entity: {recipient}")
return None

View file

@ -0,0 +1,49 @@
"""
API models for data transfer.
Defines Pydantic models for request and response data validation.
"""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field
class ChatModel(BaseModel):
"""Model representing a Telegram chat."""
id: int
title: str
username: Optional[str] = None
type: str
last_message_time: Optional[datetime] = None
class MessageModel(BaseModel):
"""Model representing a Telegram message."""
id: int
chat_id: int
chat_title: str
sender_id: int
sender_name: str
content: str
timestamp: datetime
is_from_me: bool = False
class MessageContextModel(BaseModel):
"""Model representing a message with its context."""
message: MessageModel
before: List[MessageModel] = []
after: List[MessageModel] = []
class SendMessageRequest(BaseModel):
"""Model for sending message requests."""
recipient: str
message: str
class SendMessageResponse(BaseModel):
"""Model for sending message responses."""
success: bool
message: str

49
telegram-bridge/config.py Normal file
View file

@ -0,0 +1,49 @@
"""
Configuration management for the Telegram bridge.
Loads environment variables and provides configuration settings.
"""
import os
import logging
from dotenv import load_dotenv
# Load environment variables from .env file if it exists
load_dotenv()
# Directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STORE_DIR = os.path.join(BASE_DIR, "store")
os.makedirs(STORE_DIR, exist_ok=True)
# Session and database files
SESSION_FILE = os.path.join(STORE_DIR, "telegram_session")
DB_PATH = os.path.join(STORE_DIR, "messages.db")
# API credentials
API_ID = os.getenv("TELEGRAM_API_ID")
API_HASH = os.getenv("TELEGRAM_API_HASH")
# Server configuration
HTTP_PORT = int(os.getenv("HTTP_PORT", "8081"))
HTTP_HOST = os.getenv("HTTP_HOST", "")
# Logging configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Initialize logging
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format=LOG_FORMAT,
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Validate required settings
if not API_ID or not API_HASH:
logger.error(
"TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set"
)
logger.error("Get them from https://my.telegram.org/auth")
raise ValueError("Missing API credentials")

View file

@ -0,0 +1,9 @@
"""
Database module for the Telegram bridge.
Provides ORM models, connection management, and repositories for data access.
"""
from database.base import init_db, get_session
from database.models import Chat, Message
from database.repositories import ChatRepository, MessageRepository

View file

@ -0,0 +1,42 @@
"""
Database configuration and session management.
Provides a SQLAlchemy engine and session factory with connection pooling.
"""
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
from database.models import Base
# Get database path
STORE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "store")
os.makedirs(STORE_DIR, exist_ok=True)
DB_PATH = os.path.join(STORE_DIR, "messages.db")
# Create engine with connection pooling
engine = create_engine(
f"sqlite:///{DB_PATH}",
connect_args={"check_same_thread": False}, # Needed for SQLite
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
)
# Create session factory
SessionFactory = sessionmaker(bind=engine)
Session = scoped_session(SessionFactory)
def init_db():
"""Initialize the database schema."""
Base.metadata.create_all(engine)
def get_session():
"""Get a database session from the pool."""
return Session()

View file

@ -0,0 +1,58 @@
"""
Database models for the Telegram bridge.
This module defines SQLAlchemy ORM models for storing Telegram chats and messages.
"""
from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey, Index, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class Chat(Base):
"""Represents a Telegram chat (direct message, group, channel, etc.)."""
__tablename__ = "chats"
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
username = Column(String)
type = Column(String, nullable=False)
last_message_time = Column(DateTime)
# Relationship with messages
messages = relationship("Message", back_populates="chat", cascade="all, delete-orphan")
def __repr__(self):
return f"<Chat(id={self.id}, title='{self.title}', type='{self.type}')>"
class Message(Base):
"""Represents a Telegram message with all its metadata."""
__tablename__ = "messages"
id = Column(Integer, primary_key=True)
chat_id = Column(Integer, ForeignKey("chats.id"), primary_key=True)
sender_id = Column(Integer)
sender_name = Column(String)
content = Column(Text)
timestamp = Column(DateTime, default=datetime.now)
is_from_me = Column(Boolean, default=False)
# Relationship with chat
chat = relationship("Chat", back_populates="messages")
# Indexes for improved query performance
__table_args__ = (
Index("idx_messages_chat_id", "chat_id"),
Index("idx_messages_timestamp", "timestamp"),
Index("idx_messages_content", "content"),
Index("idx_messages_sender_id", "sender_id"),
)
def __repr__(self):
return f"<Message(id={self.id}, chat_id={self.chat_id}, sender='{self.sender_name}')>"

View file

@ -0,0 +1,228 @@
"""
Repository classes for database operations.
Provides abstraction for data access operations on Telegram chats and messages.
"""
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy import desc, or_, and_
from database.base import get_session
from database.models import Chat, Message
class ChatRepository:
"""Repository for chat operations."""
def store_chat(
self,
chat_id: int,
title: str,
username: Optional[str],
chat_type: str,
last_message_time: datetime,
) -> None:
"""Store a chat in the database."""
session = get_session()
try:
chat = session.query(Chat).filter_by(id=chat_id).first()
if chat:
# Update existing chat
chat.title = title
chat.username = username
chat.type = chat_type
chat.last_message_time = last_message_time
else:
# Create new chat
chat = Chat(
id=chat_id,
title=title,
username=username,
type=chat_type,
last_message_time=last_message_time
)
session.add(chat)
session.commit()
finally:
session.close()
def get_chats(
self,
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_message_time"
) -> List[Chat]:
"""Get chats from the database."""
session = get_session()
try:
# Build query
db_query = session.query(Chat)
# Apply filters
if query:
db_query = db_query.filter(
or_(
Chat.title.ilike(f"%{query}%"),
Chat.username.ilike(f"%{query}%")
)
)
if chat_type:
db_query = db_query.filter(Chat.type == chat_type)
# Apply sorting
if sort_by == "last_message_time":
db_query = db_query.order_by(desc(Chat.last_message_time))
else:
db_query = db_query.order_by(Chat.title)
# Apply pagination
db_query = db_query.limit(limit).offset(offset)
return db_query.all()
finally:
session.close()
def get_chat_by_id(self, chat_id: int) -> Optional[Chat]:
"""Get a chat by its ID."""
session = get_session()
try:
return session.query(Chat).filter_by(id=chat_id).first()
finally:
session.close()
class MessageRepository:
"""Repository for message operations."""
def store_message(
self,
message_id: int,
chat_id: int,
sender_id: int,
sender_name: str,
content: str,
timestamp: datetime,
is_from_me: bool,
) -> None:
"""Store a message in the database."""
if not content: # Skip empty messages
return
session = get_session()
try:
message = session.query(Message).filter_by(
id=message_id, chat_id=chat_id
).first()
if message:
# Update existing message
message.sender_id = sender_id
message.sender_name = sender_name
message.content = content
message.timestamp = timestamp
message.is_from_me = is_from_me
else:
# Create new message
message = Message(
id=message_id,
chat_id=chat_id,
sender_id=sender_id,
sender_name=sender_name,
content=content,
timestamp=timestamp,
is_from_me=is_from_me
)
session.add(message)
session.commit()
finally:
session.close()
def get_messages(
self,
chat_id: Optional[int] = None,
sender_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
date_range: Optional[Tuple[datetime, datetime]] = None,
) -> List[Message]:
"""Get messages from the database."""
session = get_session()
try:
# Build query
db_query = session.query(Message).join(Chat)
# Apply filters
filters = []
if chat_id:
filters.append(Message.chat_id == chat_id)
if sender_id:
filters.append(Message.sender_id == sender_id)
if query:
filters.append(Message.content.ilike(f"%{query}%"))
if date_range:
start_date, end_date = date_range
filters.append(and_(
Message.timestamp >= start_date,
Message.timestamp <= end_date
))
if filters:
db_query = db_query.filter(and_(*filters))
# Apply sorting and pagination
db_query = db_query.order_by(desc(Message.timestamp))
db_query = db_query.limit(limit).offset(offset)
return db_query.all()
finally:
session.close()
def get_message_context(
self,
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> Dict[str, Any]:
"""Get context around a specific message."""
session = get_session()
try:
# Get the target message
target_message = session.query(Message).filter_by(
id=message_id, chat_id=chat_id
).first()
if not target_message:
raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found")
# Get messages before
before_messages = session.query(Message).filter(
Message.chat_id == chat_id,
Message.timestamp < target_message.timestamp
).order_by(desc(Message.timestamp)).limit(before).all()
# Get messages after
after_messages = session.query(Message).filter(
Message.chat_id == chat_id,
Message.timestamp > target_message.timestamp
).order_by(Message.timestamp).limit(after).all()
return {
"message": target_message,
"before": before_messages,
"after": after_messages
}
finally:
session.close()

View file

@ -1,610 +1,134 @@
"""
Telegram Bridge Module
Telegram Bridge main entry point.
This module connects to the Telegram API using the Telethon library, enabling
interaction with Telegram chats and messages. It provides an HTTP server for
sending messages to Telegram users or groups, and stores message history in
a SQLite database.
Key Features:
- Connects to Telegram using API credentials.
- Stores chat and message data in a database.
- Provides an HTTP API for sending messages.
- Synchronizes message history and processes new messages.
Usage:
- Ensure TELEGRAM_API_ID and TELEGRAM_API_HASH are set in environment variables.
- Run the script to start the Telegram bridge and HTTP server.
- Use the '/api/send' endpoint to send messages via HTTP requests.
This module initializes and runs the Telegram bridge application, connecting
to the Telegram API and providing a REST API server for interaction.
"""
import os
import sqlite3
import json
import asyncio
import logging
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
from dotenv import load_dotenv
import uvicorn
from telethon import TelegramClient, events
from telethon.tl.types import (
User,
Chat,
Channel,
Message,
Dialog,
)
from telethon.utils import get_display_name
from config import API_ID, API_HASH, SESSION_FILE, HTTP_PORT, HTTP_HOST
from database import init_db, ChatRepository, MessageRepository
from api import TelegramApiClient, TelegramMiddleware
from service import TelegramService
from server import app, get_telegram_service
# Global variable to store the main event loop
main_loop = None
# Load environment variables from .env file if it exists
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# Initialize logger
logger = logging.getLogger(__name__)
# Directory for storing data
STORE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "store")
os.makedirs(STORE_DIR, exist_ok=True)
# Database path
DB_PATH = os.path.join(STORE_DIR, "messages.db")
# Global service instance
telegram_service = None
# API credentials from environment variables
API_ID = os.getenv("TELEGRAM_API_ID")
API_HASH = os.getenv("TELEGRAM_API_HASH")
if not API_ID or not API_HASH:
logger.error(
"TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables must be set"
)
logger.error("Get them from https://my.telegram.org/auth")
sys.exit(1)
# Override the get_telegram_service function in the API app
def get_service_override():
"""Get the Telegram service singleton."""
global telegram_service
if telegram_service is None:
raise RuntimeError("Telegram service not initialized")
return telegram_service
# Initialize the Telegram client
SESSION_FILE = os.path.join(STORE_DIR, "telegram_session")
client = TelegramClient(SESSION_FILE, API_ID, API_HASH)
# Initialize the application
async def init_app():
"""Initialize the application components."""
global telegram_service
class MessageStore:
"""Handles storage and retrieval of Telegram messages in SQLite."""
# Initialize database
init_db()
def __init__(self, db_path: str):
"""Initialize the message store with the given database path."""
self.db_path = db_path
self.init_db()
# Create repositories
chat_repo = ChatRepository()
message_repo = MessageRepository()
def init_db(self):
"""Initialize the database with necessary tables."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create API client
client = TelegramApiClient(SESSION_FILE, API_ID, API_HASH)
# Create tables if they don't exist
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS chats (
id INTEGER PRIMARY KEY,
title TEXT,
username TEXT,
type TEXT,
last_message_time TIMESTAMP
)
"""
)
# Create middleware
middleware = TelegramMiddleware(client)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER,
chat_id INTEGER,
sender_id INTEGER,
sender_name TEXT,
content TEXT,
timestamp TIMESTAMP,
is_from_me BOOLEAN,
PRIMARY KEY (id, chat_id),
FOREIGN KEY (chat_id) REFERENCES chats(id)
)
"""
)
# Create service
telegram_service = TelegramService(client, middleware, chat_repo, message_repo)
# Create indexes for efficient queries
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_content ON messages(content)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_sender_id ON messages(sender_id)"
)
# Override the service getter in the API app
app.dependency_overrides[get_telegram_service] = get_service_override
conn.commit()
conn.close()
# Setup the service
await telegram_service.setup()
def store_chat(
self,
chat_id: int,
title: str,
username: Optional[str],
chat_type: str,
last_message_time: datetime,
) -> None:
"""Store a chat in the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Return the service for further use
return telegram_service
cursor.execute(
"INSERT OR REPLACE INTO chats (id, title, username, type, last_message_time) VALUES (?, ?, ?, ?, ?)",
(chat_id, title, username, chat_type, last_message_time.isoformat()),
)
conn.commit()
conn.close()
async def login_flow():
"""Interactive login flow for Telegram."""
global telegram_service
def store_message(
self,
message_id: int,
chat_id: int,
sender_id: int,
sender_name: str,
content: str,
timestamp: datetime,
is_from_me: bool,
) -> None:
"""Store a message in the database."""
if not content: # Skip empty messages
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"""INSERT OR REPLACE INTO messages
(id, chat_id, sender_id, sender_name, content, timestamp, is_from_me)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
message_id,
chat_id,
sender_id,
sender_name,
content,
timestamp.isoformat(),
is_from_me,
),
)
conn.commit()
conn.close()
def get_messages(
self,
chat_id: Optional[int] = None,
limit: int = 50,
query: Optional[str] = None,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Get messages from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query_parts = [
"SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m"
]
query_parts.append("JOIN chats c ON m.chat_id = c.id")
conditions = []
params = []
if chat_id:
conditions.append("m.chat_id = ?")
params.append(chat_id)
if query:
conditions.append("m.content LIKE ?")
params.append(f"%{query}%")
if conditions:
query_parts.append("WHERE " + " AND ".join(conditions))
query_parts.append("ORDER BY m.timestamp DESC")
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
messages = cursor.fetchall()
results = []
for msg in messages:
timestamp = datetime.fromisoformat(msg[5])
results.append(
{
"id": msg[0],
"chat_id": msg[1],
"chat_title": msg[2],
"sender_name": msg[3],
"content": msg[4],
"timestamp": timestamp,
"is_from_me": msg[6],
"sender_id": msg[7],
}
)
conn.close()
return results
def get_chats(
self, limit: int = 50, query: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get chats from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"]
params = []
if query:
query_parts.append("WHERE title LIKE ? OR username LIKE ?")
params.extend([f"%{query}%", f"%{query}%"])
query_parts.append("ORDER BY last_message_time DESC")
query_parts.append("LIMIT ?")
params.append(limit)
cursor.execute(" ".join(query_parts), tuple(params))
chats = cursor.fetchall()
results = []
for chat in chats:
last_message_time = datetime.fromisoformat(chat[4]) if chat[4] else None
results.append(
{
"id": chat[0],
"title": chat[1],
"username": chat[2],
"type": chat[3],
"last_message_time": last_message_time,
}
)
conn.close()
return results
# Create message store
message_store = MessageStore(DB_PATH)
async def process_message(message: Message) -> None:
"""Process and store a message."""
if not message.text:
return # Skip non-text messages
# Get the chat
chat = message.chat
if not chat:
return
chat_id = message.chat_id
# Determine chat type and name
if isinstance(chat, User):
chat_type = "user"
title = get_display_name(chat)
username = chat.username
elif isinstance(chat, Chat):
chat_type = "group"
title = chat.title
username = None
elif isinstance(chat, Channel):
chat_type = "channel" if chat.broadcast else "supergroup"
title = chat.title
username = chat.username
else:
logger.warning(f"Unknown chat type: {type(chat)}")
return
# Store chat information
message_store.store_chat(
chat_id=chat_id,
title=title,
username=username,
chat_type=chat_type,
last_message_time=message.date,
)
# Get sender information
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
# Check if the message is from the current user
my_id = (await client.get_me()).id
is_from_me = sender_id == my_id
# Store the message
message_store.store_message(
message_id=message.id,
chat_id=chat_id,
sender_id=sender_id,
sender_name=sender_name,
content=message.text,
timestamp=message.date,
is_from_me=is_from_me,
)
logger.info(
f"Stored message: [{message.date}] {sender_name} in {title}: {message.text[:30]}..."
)
async def sync_dialog_history(dialog: Dialog, limit: int = 100) -> None:
"""Sync message history for a specific dialog."""
chat_entity = dialog.entity
# Extract chat info
if isinstance(chat_entity, User):
chat_type = "user"
title = get_display_name(chat_entity)
username = chat_entity.username
elif isinstance(chat_entity, Chat):
chat_type = "group"
title = chat_entity.title
username = None
elif isinstance(chat_entity, Channel):
chat_type = "channel" if chat_entity.broadcast else "supergroup"
title = chat_entity.title
username = chat_entity.username
else:
logger.warning(f"Unknown chat type: {type(chat_entity)}")
return
# Store chat info with last message time
message_store.store_chat(
chat_id=dialog.id,
title=title,
username=username,
chat_type=chat_type,
last_message_time=dialog.date,
)
# Get messages
messages = await client.get_messages(dialog.entity, limit=limit)
# Get current user ID
my_id = (await client.get_me()).id
# Process each message
for message in messages:
if not message.text:
continue # Skip non-text messages
# Get sender information
try:
sender = await message.get_sender()
sender_id = sender.id if sender else 0
sender_name = get_display_name(sender) if sender else "Unknown"
except Exception as e:
logger.error(f"Error getting sender: {e}")
sender_id = 0
sender_name = "Unknown"
is_from_me = sender_id == my_id
# Store the message
message_store.store_message(
message_id=message.id,
chat_id=dialog.id,
sender_id=sender_id,
sender_name=sender_name,
content=message.text,
timestamp=message.date,
is_from_me=is_from_me,
)
logger.info(f"Synced {len(messages)} messages from {title}")
async def sync_all_dialogs() -> None:
"""Sync message history for all dialogs."""
logger.info("Starting synchronization of all dialogs")
# Get all dialogs (chats)
dialogs = await client.get_dialogs(limit=100)
for dialog in dialogs:
try:
await sync_dialog_history(dialog)
except Exception as e:
logger.error(f"Error syncing dialog {dialog.name}: {e}")
logger.info(f"Completed synchronization of {len(dialogs)} dialogs")
# HTTP server for API endpoints
class TelegramAPIHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
request = json.loads(post_data.decode("utf-8"))
if self.path == "/api/send":
self._handle_send_message(request)
else:
self.send_error(404, "Endpoint not found")
def _handle_send_message(self, request):
recipient = request.get("recipient")
message_text = request.get("message")
if not recipient or not message_text:
self._send_json_response(
400, {"success": False, "message": "Recipient and message are required"}
)
return
try:
# Instead of creating a new event loop, use a shared queue to communicate with the main thread
# Create a Future to hold the result
future = asyncio.run_coroutine_threadsafe(
send_message(recipient, message_text), main_loop
)
# Wait for the result (with timeout)
try:
success, message = future.result(10) # Wait up to 10 seconds
self._send_json_response(
200 if success else 500, {"success": success, "message": message}
)
except asyncio.TimeoutError:
self._send_json_response(
504,
{
"success": False,
"message": "Request timed out while sending message",
},
)
except Exception as inner_e:
logger.error(f"Error in Future: {inner_e}")
self._send_json_response(
500,
{"success": False, "message": f"Error in Future: {str(inner_e)}"},
)
except Exception as e:
logger.error(f"Error sending message: {e}")
self._send_json_response(
500, {"success": False, "message": f"Error: {str(e)}"}
)
def _send_json_response(self, status_code, data):
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
async def send_message(recipient: str, message: str) -> Tuple[bool, str]:
"""Send a message to a Telegram recipient."""
if not client.is_connected():
return False, "Not connected to Telegram"
try:
# Try to parse recipient as an integer (chat ID)
try:
chat_id = int(recipient)
entity = await client.get_entity(chat_id)
except ValueError:
# Not an integer, try as username
if recipient.startswith("@"):
recipient = recipient[1:] # Remove @ if present
try:
entity = await client.get_entity(recipient)
except Exception:
# Try to find in database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"SELECT id FROM chats WHERE title LIKE ? OR username = ?",
(f"%{recipient}%", recipient),
)
result = cursor.fetchone()
conn.close()
if result:
entity = await client.get_entity(result[0])
else:
return False, f"Recipient not found: {recipient}"
# Send the message
await client.send_message(entity, message)
return True, f"Message sent to {recipient}"
except Exception as e:
logger.error(f"Error sending message: {e}")
return False, f"Error sending message: {str(e)}"
# Start HTTP server in a separate thread
def start_http_server(port: int = 8081):
server_address = ("", port)
httpd = HTTPServer(server_address, TelegramAPIHandler)
logger.info(f"Starting HTTP server on port {port}")
httpd.serve_forever()
async def main():
global main_loop
# Store the current event loop
main_loop = asyncio.get_event_loop()
logger.info("Starting Telegram bridge")
# Connect to Telegram
await client.connect()
# Check if we're already authorized
if not await client.is_user_authorized():
if not await telegram_service.authorize():
logger.info("Need to log in. Please enter your phone number:")
phone = input("Phone number: ")
await client.send_code_request(phone)
# Send the code request
await telegram_service.client.send_code_request(phone)
logger.info("Code sent. Please enter the code you received:")
code = input("Code: ")
try:
await client.sign_in(phone, code)
success = await telegram_service.login(phone, code)
if not success:
logger.error("Failed to log in with the provided code")
return False
except Exception as e:
logger.error(f"Error signing in: {e}")
logger.info(
"If you have two-factor authentication enabled, please enter your password:"
)
password = input("Password: ")
await client.sign_in(password=password)
success = await telegram_service.login(phone, code, password)
if not success:
logger.error("Failed to log in with the provided password")
return False
logger.info("Successfully logged in to Telegram")
return True
# Start HTTP server in a separate thread
server_thread = threading.Thread(target=start_http_server)
server_thread.daemon = True
server_thread.start()
# Register event handler for new messages
@client.on(events.NewMessage)
async def handle_new_message(event):
await process_message(event.message)
async def main():
"""Main application entry point."""
try:
# Initialize the application
global telegram_service
telegram_service = await init_app()
# Login to Telegram if needed
if not await login_flow():
logger.error("Failed to authenticate with Telegram")
return
# Initial sync of message history
await sync_all_dialogs()
await telegram_service.sync_all_dialogs()
# Start the API server
logger.info(f"Starting FastAPI server on {HTTP_HOST}:{HTTP_PORT}")
config = uvicorn.Config(app=app, host=HTTP_HOST, port=HTTP_PORT, log_level="info")
server = uvicorn.Server(config)
# Keep the script running
logger.info("Telegram bridge is running. Press Ctrl+C to exit.")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
await server.serve()
if __name__ == "__main__":
# Run the main function
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutting down Telegram bridge")
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
# Run the main function
asyncio.run(main())

View file

@ -0,0 +1,7 @@
"""
Server module for the Telegram bridge.
Provides FastAPI application for HTTP endpoints.
"""
from server.app import app, get_telegram_service

View file

@ -0,0 +1,170 @@
"""
FastAPI application for the Telegram bridge.
Provides HTTP API endpoints for interacting with Telegram.
"""
from fastapi import FastAPI, HTTPException, Depends
from typing import List, Optional
from api.models import (
ChatModel,
MessageModel,
MessageContextModel,
SendMessageRequest,
SendMessageResponse
)
from service import TelegramService
# Create FastAPI app
app = FastAPI(
title="Telegram Bridge API",
description="API for interacting with Telegram",
version="1.0.0"
)
# Service dependency
def get_telegram_service() -> TelegramService:
"""Get the Telegram service instance.
This function should be replaced with a proper dependency injection
mechanism to return the singleton instance of the TelegramService.
"""
# This is a placeholder. In main.py, we'll set this to the actual service instance
raise NotImplementedError("Telegram service not initialized")
@app.get("/api/chats", response_model=List[ChatModel])
async def list_chats(
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_message_time",
service: TelegramService = Depends(get_telegram_service)
):
"""List chats."""
chats = service.chat_repo.get_chats(
query=query,
limit=limit,
offset=offset,
chat_type=chat_type,
sort_by=sort_by
)
return [
ChatModel(
id=chat.id,
title=chat.title,
username=chat.username,
type=chat.type,
last_message_time=chat.last_message_time
) for chat in chats
]
@app.get("/api/messages", response_model=List[MessageModel])
async def list_messages(
chat_id: Optional[int] = None,
sender_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
service: TelegramService = Depends(get_telegram_service)
):
"""List messages."""
messages = service.message_repo.get_messages(
chat_id=chat_id,
sender_id=sender_id,
query=query,
limit=limit,
offset=offset
)
return [
MessageModel(
id=msg.id,
chat_id=msg.chat_id,
chat_title=msg.chat.title,
sender_id=msg.sender_id,
sender_name=msg.sender_name,
content=msg.content,
timestamp=msg.timestamp,
is_from_me=msg.is_from_me
) for msg in messages
]
@app.get("/api/messages/{chat_id}/{message_id}/context", response_model=MessageContextModel)
async def get_message_context(
chat_id: int,
message_id: int,
before: int = 5,
after: int = 5,
service: TelegramService = Depends(get_telegram_service)
):
"""Get context around a message."""
try:
context = service.message_repo.get_message_context(
message_id=message_id,
chat_id=chat_id,
before=before,
after=after
)
# Convert to model
target_message = MessageModel(
id=context["message"].id,
chat_id=context["message"].chat_id,
chat_title=context["message"].chat.title,
sender_id=context["message"].sender_id,
sender_name=context["message"].sender_name,
content=context["message"].content,
timestamp=context["message"].timestamp,
is_from_me=context["message"].is_from_me
)
before_messages = [
MessageModel(
id=msg.id,
chat_id=msg.chat_id,
chat_title=msg.chat.title,
sender_id=msg.sender_id,
sender_name=msg.sender_name,
content=msg.content,
timestamp=msg.timestamp,
is_from_me=msg.is_from_me
) for msg in context["before"]
]
after_messages = [
MessageModel(
id=msg.id,
chat_id=msg.chat_id,
chat_title=msg.chat.title,
sender_id=msg.sender_id,
sender_name=msg.sender_name,
content=msg.content,
timestamp=msg.timestamp,
is_from_me=msg.is_from_me
) for msg in context["after"]
]
return MessageContextModel(
message=target_message,
before=before_messages,
after=after_messages
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.post("/api/send", response_model=SendMessageResponse)
async def send_message(
request: SendMessageRequest,
service: TelegramService = Depends(get_telegram_service)
):
"""Send a message to a Telegram recipient."""
success, message = await service.send_message(
recipient=request.recipient,
message=request.message
)
return SendMessageResponse(success=success, message=message)

224
telegram-bridge/service.py Normal file
View file

@ -0,0 +1,224 @@
"""
Service layer for the Telegram bridge.
Connects the API middleware with database repositories to provide
high-level operations for the application.
"""
import logging
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from telethon import events
from api import TelegramApiClient, TelegramMiddleware
from database import ChatRepository, MessageRepository
logger = logging.getLogger(__name__)
class TelegramService:
"""Service for Telegram operations."""
def __init__(
self,
telegram_client: TelegramApiClient,
middleware: TelegramMiddleware,
chat_repo: ChatRepository,
message_repo: MessageRepository
):
"""Initialize the service.
Args:
telegram_client: Telegram API client
middleware: Telegram middleware
chat_repo: Chat repository
message_repo: Message repository
"""
self.client = telegram_client
self.middleware = middleware
self.chat_repo = chat_repo
self.message_repo = message_repo
async def setup(self) -> None:
"""Set up the service, connect to Telegram, and register handlers."""
# Connect to Telegram
await self.client.connect()
# Register event handlers
self.client.add_event_handler(self._handle_new_message, events.NewMessage)
async def authorize(self) -> bool:
"""Authorize with Telegram if needed."""
if await self.client.is_authorized():
logger.info("Already authorized with Telegram")
return True
logger.info("Not authorized with Telegram. Interactive login required.")
return False
async def login(self, phone: str, code: str, password: Optional[str] = None) -> bool:
"""Login to Telegram.
Args:
phone: Phone number
code: Verification code
password: Two-factor authentication password (optional)
Returns:
bool: True if login successful, False otherwise
"""
if password:
return await self.client.sign_in(phone=phone, code=code, password=password)
else:
# First send code request
await self.client.send_code_request(phone)
# Then sign in with the code
return await self.client.sign_in(phone=phone, code=code)
async def sync_all_dialogs(self, limit: int = 100) -> None:
"""Sync all dialogs (chats) from Telegram.
Args:
limit: Maximum number of dialogs to retrieve
"""
logger.info("Starting synchronization of all dialogs")
# Get all dialogs (chats)
dialogs = await self.client.get_dialogs(limit=limit)
for dialog in dialogs:
try:
await self.sync_dialog_history(dialog)
except Exception as e:
logger.error(f"Error syncing dialog {dialog.name}: {e}")
logger.info(f"Completed synchronization of {len(dialogs)} dialogs")
async def sync_dialog_history(self, dialog, limit: int = 100) -> None:
"""Sync message history for a specific dialog.
Args:
dialog: Dialog to sync
limit: Maximum number of messages to retrieve
"""
# Process dialog entity
chat_info = await self.middleware.process_dialog(dialog)
if not chat_info:
logger.warning(f"Could not process dialog: {dialog}")
return
# Store chat information
self.chat_repo.store_chat(
chat_id=chat_info["id"],
title=chat_info["title"],
username=chat_info.get("username"),
chat_type=chat_info["type"],
last_message_time=chat_info["last_message_time"]
)
# Get messages
messages = await self.client.get_messages(dialog.entity, limit=limit)
# Process each message
for message in messages:
msg_info = await self.middleware.process_message(message)
if msg_info:
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
logger.info(f"Synced {len(messages)} messages from {chat_info['title']}")
async def send_message(self, recipient: str, message: str) -> Tuple[bool, str]:
"""Send a message to a Telegram recipient.
Args:
recipient: Recipient identifier (ID, username, or title)
message: Message text to send
Returns:
Tuple[bool, str]: Success status and message
"""
if not self.client.client.is_connected():
return False, "Not connected to Telegram"
entity = await self.middleware.find_entity_by_name_or_id(recipient)
if not entity:
# Try to find in database
try:
# Try to parse as integer
chat_id = int(recipient)
chat = self.chat_repo.get_chat_by_id(chat_id)
if chat:
entity = await self.client.get_entity(chat_id)
except ValueError:
# Not an integer, try to find by name
chats = self.chat_repo.get_chats(query=recipient, limit=1)
if chats:
entity = await self.client.get_entity(chats[0].id)
if not entity:
return False, f"Recipient not found: {recipient}"
# Send the message
sent_message = await self.client.send_message(entity, message)
if sent_message:
# Process and store the sent message
msg_info = await self.middleware.process_message(sent_message)
if msg_info:
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
return True, f"Message sent to {recipient}"
else:
return False, f"Failed to send message to {recipient}"
async def _handle_new_message(self, event) -> None:
"""Handle a new message event from Telegram."""
message = event.message
msg_info = await self.middleware.process_message(message)
if msg_info:
# Process and store chat information
chat_entity = message.chat
if chat_entity:
chat_info = await self.middleware.process_chat_entity(chat_entity)
self.chat_repo.store_chat(
chat_id=chat_info["id"],
title=chat_info["title"],
username=chat_info.get("username"),
chat_type=chat_info["type"],
last_message_time=message.date
)
# Store the message
self.message_repo.store_message(
message_id=msg_info["id"],
chat_id=msg_info["chat_id"],
sender_id=msg_info["sender_id"],
sender_name=msg_info["sender_name"],
content=msg_info["content"],
timestamp=msg_info["timestamp"],
is_from_me=msg_info["is_from_me"]
)
logger.info(
f"Stored message: [{msg_info['timestamp']}] {msg_info['sender_name']} "
f"in {msg_info['chat_title']}: {msg_info['content'][:30]}..."
)

View file

@ -1,620 +0,0 @@
"""
This module implements data models and functions for retrieving and sending
Telegram messages, managing chats, and working with contacts.
The module connects to a SQLite database that stores all Telegram messages and chat data,
which is maintained by the Telegram Bridge. It also provides an HTTP client for sending
messages via the Bridge's API endpoint.
Main features:
- Data models for messages, chats, contacts, and message context
- Database access functions for retrieving messages, chats, and contacts
- HTTP client for sending messages through the Telegram Bridge
- Helper functions for displaying formatted messages and chats
All database operations use parameterized queries to prevent SQL injection.
"""
import sqlite3
import requests
import json
import os.path
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict, Any
# Database path
MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'telegram-bridge', 'store', 'messages.db')
TELEGRAM_API_BASE_URL = "http://localhost:8081/api"
@dataclass
class Message:
"""
Represents a Telegram message with all its metadata.
Attributes:
id: Unique message identifier
chat_id: ID of the chat the message belongs to
chat_title: Title of the chat (user name, group name, etc.)
sender_name: Name of the message sender
content: Text content of the message
timestamp: Date and time when the message was sent
is_from_me: Boolean indicating if the message was sent by the user
sender_id: ID of the message sender
"""
id: int
chat_id: int
chat_title: str
sender_name: str
content: str
timestamp: datetime
is_from_me: bool
sender_id: int
@dataclass
class Chat:
"""
Represents a Telegram chat (direct message, group, channel, etc.).
Attributes:
id: Unique chat identifier
title: Name of the chat (user name, group name, etc.)
username: Optional Telegram username (without @)
type: Type of chat ('user', 'group', 'channel', 'supergroup')
last_message_time: Timestamp of the most recent message in the chat
"""
id: int
title: str
username: Optional[str]
type: str
last_message_time: Optional[datetime]
@dataclass
class Contact:
"""
Represents a Telegram contact.
Attributes:
id: Unique contact identifier
username: Optional Telegram username (without @)
name: Display name of the contact
"""
id: int
username: Optional[str]
name: str
@dataclass
class MessageContext:
"""
Provides context around a specific message.
Attributes:
message: The target message
before: List of messages that came before the target message
after: List of messages that came after the target message
"""
message: Message
before: List[Message]
after: List[Message]
def print_message(message: Message, show_chat_info: bool = True) -> None:
"""Print a single message with consistent formatting."""
direction = "" if message.is_from_me else ""
if show_chat_info:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})")
else:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}")
print(f"From: {'Me' if message.is_from_me else message.sender_name}")
print(f"Message: {message.content}")
print("-" * 100)
def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None:
"""Print a list of messages with a title and consistent formatting."""
if not messages:
print("No messages to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for message in messages:
print_message(message, show_chat_info)
def print_chat(chat: Chat) -> None:
"""Print a single chat with consistent formatting."""
print(f"Chat: {chat.title} (ID: {chat.id})")
print(f"Type: {chat.type}")
if chat.username:
print(f"Username: @{chat.username}")
if chat.last_message_time:
print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}")
print("-" * 100)
def print_chats_list(chats: List[Chat], title: str = "") -> None:
"""Print a list of chats with a title and consistent formatting."""
if not chats:
print("No chats to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for chat in chats:
print_chat(chat)
def search_contacts(query: str) -> List[Contact]:
"""Search contacts by name or username."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Search in chats where type is 'user'
cursor.execute("""
SELECT id, username, title
FROM chats
WHERE type = 'user' AND (title LIKE ? OR username LIKE ?)
ORDER BY title
LIMIT 50
""", (f"%{query}%", f"%{query}%"))
contacts = cursor.fetchall()
result = []
for contact_data in contacts:
contact = Contact(
id=contact_data[0],
username=contact_data[1],
name=contact_data[2]
)
result.append(contact)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def list_messages(
date_range: Optional[Tuple[datetime, datetime]] = None,
sender_id: Optional[int] = None,
chat_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_context: bool = True,
context_before: int = 1,
context_after: int = 1
) -> List[Message]:
"""Get messages matching the specified criteria with optional context."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["""
SELECT
m.id,
m.chat_id,
c.title,
m.sender_name,
m.content,
m.timestamp,
m.is_from_me,
m.sender_id
FROM messages m
"""]
query_parts.append("JOIN chats c ON m.chat_id = c.id")
where_clauses = []
params = []
# Add filters
if date_range:
where_clauses.append("m.timestamp BETWEEN ? AND ?")
params.extend([date_range[0].isoformat(), date_range[1].isoformat()])
if sender_id:
where_clauses.append("m.sender_id = ?")
params.append(sender_id)
if chat_id:
where_clauses.append("m.chat_id = ?")
params.append(chat_id)
if query:
where_clauses.append("LOWER(m.content) LIKE LOWER(?)")
params.append(f"%{query}%")
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add pagination
offset = page * limit
query_parts.append("ORDER BY m.timestamp DESC")
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
messages = cursor.fetchall()
result = []
for msg in messages:
message = Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
)
result.append(message)
if include_context and result:
# Add context for each message
messages_with_context = []
for msg in result:
context = get_message_context(msg.id, msg.chat_id, context_before, context_after)
messages_with_context.extend(context.before)
messages_with_context.append(context.message)
messages_with_context.extend(context.after)
return messages_with_context
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_message_context(
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> MessageContext:
"""Get context around a specific message."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Get the target message first
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.id = ? AND m.chat_id = ?
""", (message_id, chat_id))
msg_data = cursor.fetchone()
if not msg_data:
raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found")
target_message = Message(
id=msg_data[0],
chat_id=msg_data[1],
chat_title=msg_data[2],
sender_name=msg_data[3],
content=msg_data[4],
timestamp=datetime.fromisoformat(msg_data[5]),
is_from_me=bool(msg_data[6]),
sender_id=msg_data[7]
)
# Get messages before
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.chat_id = ? AND m.timestamp < ?
ORDER BY m.timestamp DESC
LIMIT ?
""", (chat_id, target_message.timestamp.isoformat(), before))
before_messages = []
for msg in cursor.fetchall():
before_messages.append(Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
))
# Get messages after
cursor.execute("""
SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.chat_id = ? AND m.timestamp > ?
ORDER BY m.timestamp ASC
LIMIT ?
""", (chat_id, target_message.timestamp.isoformat(), after))
after_messages = []
for msg in cursor.fetchall():
after_messages.append(Message(
id=msg[0],
chat_id=msg[1],
chat_title=msg[2],
sender_name=msg[3],
content=msg[4],
timestamp=datetime.fromisoformat(msg[5]),
is_from_me=bool(msg[6]),
sender_id=msg[7]
))
return MessageContext(
message=target_message,
before=before_messages,
after=after_messages
)
except sqlite3.Error as e:
print(f"Database error: {e}")
raise
finally:
if 'conn' in locals():
conn.close()
def list_chats(
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_active"
) -> List[Chat]:
"""Get chats matching the specified criteria."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"]
where_clauses = []
params = []
if query:
where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(username) LIKE LOWER(?))")
params.extend([f"%{query}%", f"%{query}%"])
if chat_type:
where_clauses.append("type = ?")
params.append(chat_type)
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add sorting
order_by = "last_message_time DESC" if sort_by == "last_active" else "title"
query_parts.append(f"ORDER BY {order_by}")
# Add pagination
offset = (page) * limit
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
chats = cursor.fetchall()
result = []
for chat_data in chats:
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
chat = Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_chat(chat_id: int) -> Optional[Chat]:
"""Get chat metadata by ID."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, username, type, last_message_time
FROM chats
WHERE id = ?
""", (chat_id,))
chat_data = cursor.fetchone()
if not chat_data:
return None
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
return Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]:
"""Get direct chat metadata by contact ID."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, username, type, last_message_time
FROM chats
WHERE id = ? AND type = 'user'
""", (contact_id,))
chat_data = cursor.fetchone()
if not chat_data:
return None
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
return Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]:
"""Get all chats involving the contact.
Args:
contact_id: The contact's ID to search for
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
"""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
c.id, c.title, c.username, c.type, c.last_message_time
FROM chats c
JOIN messages m ON c.id = m.chat_id
WHERE m.sender_id = ? OR c.id = ?
ORDER BY c.last_message_time DESC
LIMIT ? OFFSET ?
""", (contact_id, contact_id, limit, page * limit))
chats = cursor.fetchall()
result = []
for chat_data in chats:
last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None
chat = Chat(
id=chat_data[0],
title=chat_data[1],
username=chat_data[2],
type=chat_data[3],
last_message_time=last_message_time
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_last_interaction(contact_id: int) -> Optional[Message]:
"""Get most recent message involving the contact."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT
m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id
FROM messages m
JOIN chats c ON m.chat_id = c.id
WHERE m.sender_id = ? OR c.id = ?
ORDER BY m.timestamp DESC
LIMIT 1
""", (contact_id, contact_id))
msg_data = cursor.fetchone()
if not msg_data:
return None
return Message(
id=msg_data[0],
chat_id=msg_data[1],
chat_title=msg_data[2],
sender_name=msg_data[3],
content=msg_data[4],
timestamp=datetime.fromisoformat(msg_data[5]),
is_from_me=bool(msg_data[6]),
sender_id=msg_data[7]
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def send_message(recipient: str, message: str) -> Tuple[bool, str]:
"""Send a Telegram message to the specified recipient.
Args:
recipient: The recipient - either a username (with or without @),
or a chat ID as a string or integer
message: The message text to send
Returns:
Tuple[bool, str]: A tuple containing success status and a status message
"""
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
url = f"{TELEGRAM_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"message": message
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: Unknown"
except Exception as e:
return False, f"Unexpected error: {str(e)}"

View file

@ -0,0 +1,29 @@
"""
This module implements data models and functions for retrieving and sending
Telegram messages, managing chats, and working with contacts.
The module connects to a SQLite database that stores all Telegram messages and chat data,
which is maintained by the Telegram Bridge. It also provides an HTTP client for sending
messages via the Bridge's API endpoint.
Main features:
- Data models for messages, chats, contacts, and message context
- Database access functions for retrieving messages, chats, and contacts
- HTTP client for sending messages through the Telegram Bridge
- Helper functions for displaying formatted messages and chats
"""
import os.path
# Database path
MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '..', 'telegram-bridge', 'store', 'messages.db')
TELEGRAM_API_BASE_URL = "http://localhost:8081/api"
# Import all components to make them available at the module level
from .models import Message, Chat, Contact, MessageContext
from .display import print_message, print_messages_list, print_chat, print_chats_list
from .api import send_message
from .database import (
search_contacts, list_messages, get_message_context, list_chats,
get_chat, get_direct_chat_by_contact, get_contact_chats, get_last_interaction
)

View file

@ -0,0 +1,47 @@
"""
API client for interacting with the Telegram Bridge API.
"""
import requests
import json
from typing import Tuple
from . import TELEGRAM_API_BASE_URL
def send_message(recipient: str, message: str) -> Tuple[bool, str]:
"""Send a Telegram message to the specified recipient.
Args:
recipient: The recipient - either a username (with or without @),
or a chat ID as a string or integer
message: The message text to send
Returns:
Tuple[bool, str]: A tuple containing success status and a status message
"""
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
url = f"{TELEGRAM_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"message": message
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: Unknown"
except Exception as e:
return False, f"Unexpected error: {str(e)}"

View file

@ -0,0 +1,414 @@
"""
Database operations for retrieving and managing Telegram data.
Uses SQLAlchemy ORM for database access instead of raw SQL.
"""
from sqlalchemy import create_engine, or_, and_, desc
from sqlalchemy.orm import sessionmaker, scoped_session
from datetime import datetime
from typing import Optional, List, Tuple
from . import MESSAGES_DB_PATH
from .models import Message, Chat, Contact, MessageContext
# Initialize SQLAlchemy engine and session
engine = create_engine(
f"sqlite:///{MESSAGES_DB_PATH}",
connect_args={"check_same_thread": False} # Needed for SQLite
)
SessionFactory = sessionmaker(bind=engine)
Session = scoped_session(SessionFactory)
# Import SQLAlchemy models from telegram-bridge
import sys
import os
# Add the parent directory to path to find telegram-bridge
bridge_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'telegram-bridge')
sys.path.append(bridge_path)
# Import models directly from telegram-bridge
from database.models import Chat as DBChat, Message as DBMessage
def search_contacts(query: str) -> List[Contact]:
"""Search contacts by name or username."""
try:
session = Session()
# Search in chats where type is 'user'
db_contacts = session.query(DBChat).filter(
DBChat.type == 'user',
or_(
DBChat.title.ilike(f"%{query}%"),
DBChat.username.ilike(f"%{query}%")
)
).order_by(DBChat.title).limit(50).all()
result = []
for db_contact in db_contacts:
contact = Contact(
id=db_contact.id,
username=db_contact.username,
name=db_contact.title
)
result.append(contact)
return result
except Exception as e:
print(f"Database error: {e}")
return []
finally:
session.close()
def list_messages(
date_range: Optional[Tuple[datetime, datetime]] = None,
sender_id: Optional[int] = None,
chat_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_context: bool = True,
context_before: int = 1,
context_after: int = 1
) -> List[Message]:
"""Get messages matching the specified criteria with optional context."""
try:
session = Session()
# Build base query with join
db_query = session.query(DBMessage, DBChat).join(DBChat)
# Add filters
filters = []
if date_range:
filters.append(and_(
DBMessage.timestamp >= date_range[0],
DBMessage.timestamp <= date_range[1]
))
if sender_id:
filters.append(DBMessage.sender_id == sender_id)
if chat_id:
filters.append(DBMessage.chat_id == chat_id)
if query:
filters.append(DBMessage.content.ilike(f"%{query}%"))
if filters:
db_query = db_query.filter(and_(*filters))
# Add pagination
offset = page * limit
db_query = db_query.order_by(desc(DBMessage.timestamp))
db_query = db_query.limit(limit).offset(offset)
# Execute query
db_results = db_query.all()
result = []
for db_msg, db_chat in db_results:
message = Message(
id=db_msg.id,
chat_id=db_msg.chat_id,
chat_title=db_chat.title,
sender_name=db_msg.sender_name,
content=db_msg.content,
timestamp=db_msg.timestamp,
is_from_me=db_msg.is_from_me,
sender_id=db_msg.sender_id
)
result.append(message)
if include_context and result:
# Add context for each message
messages_with_context = []
for msg in result:
context = get_message_context(msg.id, msg.chat_id, context_before, context_after)
messages_with_context.extend(context.before)
messages_with_context.append(context.message)
messages_with_context.extend(context.after)
return messages_with_context
return result
except Exception as e:
print(f"Database error: {e}")
return []
finally:
session.close()
def get_message_context(
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> MessageContext:
"""Get context around a specific message."""
try:
session = Session()
# Get the target message first
result = session.query(DBMessage, DBChat) \
.join(DBChat) \
.filter(DBMessage.id == message_id, DBMessage.chat_id == chat_id) \
.first()
if not result:
raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found")
db_msg, db_chat = result
target_message = Message(
id=db_msg.id,
chat_id=db_msg.chat_id,
chat_title=db_chat.title,
sender_name=db_msg.sender_name,
content=db_msg.content,
timestamp=db_msg.timestamp,
is_from_me=db_msg.is_from_me,
sender_id=db_msg.sender_id
)
# Get messages before
before_results = session.query(DBMessage, DBChat) \
.join(DBChat) \
.filter(
DBMessage.chat_id == chat_id,
DBMessage.timestamp < target_message.timestamp
) \
.order_by(desc(DBMessage.timestamp)) \
.limit(before) \
.all()
before_messages = []
for db_msg, db_chat in before_results:
before_messages.append(Message(
id=db_msg.id,
chat_id=db_msg.chat_id,
chat_title=db_chat.title,
sender_name=db_msg.sender_name,
content=db_msg.content,
timestamp=db_msg.timestamp,
is_from_me=db_msg.is_from_me,
sender_id=db_msg.sender_id
))
# Get messages after
after_results = session.query(DBMessage, DBChat) \
.join(DBChat) \
.filter(
DBMessage.chat_id == chat_id,
DBMessage.timestamp > target_message.timestamp
) \
.order_by(DBMessage.timestamp) \
.limit(after) \
.all()
after_messages = []
for db_msg, db_chat in after_results:
after_messages.append(Message(
id=db_msg.id,
chat_id=db_msg.chat_id,
chat_title=db_chat.title,
sender_name=db_msg.sender_name,
content=db_msg.content,
timestamp=db_msg.timestamp,
is_from_me=db_msg.is_from_me,
sender_id=db_msg.sender_id
))
return MessageContext(
message=target_message,
before=before_messages,
after=after_messages
)
except Exception as e:
print(f"Database error: {e}")
raise
finally:
session.close()
def list_chats(
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_active"
) -> List[Chat]:
"""Get chats matching the specified criteria."""
try:
session = Session()
# Build base query
db_query = session.query(DBChat)
# Add filters
filters = []
if query:
filters.append(or_(
DBChat.title.ilike(f"%{query}%"),
DBChat.username.ilike(f"%{query}%")
))
if chat_type:
filters.append(DBChat.type == chat_type)
if filters:
db_query = db_query.filter(and_(*filters))
# Add sorting
if sort_by == "last_active":
db_query = db_query.order_by(desc(DBChat.last_message_time))
else:
db_query = db_query.order_by(DBChat.title)
# Add pagination
offset = page * limit
db_query = db_query.limit(limit).offset(offset)
# Execute query
db_chats = db_query.all()
result = []
for db_chat in db_chats:
chat = Chat(
id=db_chat.id,
title=db_chat.title,
username=db_chat.username,
type=db_chat.type,
last_message_time=db_chat.last_message_time
)
result.append(chat)
return result
except Exception as e:
print(f"Database error: {e}")
return []
finally:
session.close()
def get_chat(chat_id: int) -> Optional[Chat]:
"""Get chat metadata by ID."""
try:
session = Session()
db_chat = session.query(DBChat).filter(DBChat.id == chat_id).first()
if not db_chat:
return None
return Chat(
id=db_chat.id,
title=db_chat.title,
username=db_chat.username,
type=db_chat.type,
last_message_time=db_chat.last_message_time
)
except Exception as e:
print(f"Database error: {e}")
return None
finally:
session.close()
def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]:
"""Get direct chat metadata by contact ID."""
try:
session = Session()
db_chat = session.query(DBChat).filter(
DBChat.id == contact_id,
DBChat.type == 'user'
).first()
if not db_chat:
return None
return Chat(
id=db_chat.id,
title=db_chat.title,
username=db_chat.username,
type=db_chat.type,
last_message_time=db_chat.last_message_time
)
except Exception as e:
print(f"Database error: {e}")
return None
finally:
session.close()
def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]:
"""Get all chats involving the contact.
Args:
contact_id: The contact's ID to search for
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
"""
try:
session = Session()
# Using a subquery to get distinct chats for the contact
db_chats = session.query(DBChat).join(DBMessage, DBChat.id == DBMessage.chat_id).filter(
or_(
DBMessage.sender_id == contact_id,
DBChat.id == contact_id
)
).distinct().order_by(desc(DBChat.last_message_time)).limit(limit).offset(page * limit).all()
result = []
for db_chat in db_chats:
chat = Chat(
id=db_chat.id,
title=db_chat.title,
username=db_chat.username,
type=db_chat.type,
last_message_time=db_chat.last_message_time
)
result.append(chat)
return result
except Exception as e:
print(f"Database error: {e}")
return []
finally:
session.close()
def get_last_interaction(contact_id: int) -> Optional[Message]:
"""Get most recent message involving the contact."""
try:
session = Session()
result = session.query(DBMessage, DBChat).join(DBChat).filter(
or_(
DBMessage.sender_id == contact_id,
DBChat.id == contact_id
)
).order_by(desc(DBMessage.timestamp)).first()
if not result:
return None
db_msg, db_chat = result
return Message(
id=db_msg.id,
chat_id=db_msg.chat_id,
chat_title=db_chat.title,
sender_name=db_msg.sender_name,
content=db_msg.content,
timestamp=db_msg.timestamp,
is_from_me=db_msg.is_from_me,
sender_id=db_msg.sender_id
)
except Exception as e:
print(f"Database error: {e}")
return None
finally:
session.close()

View file

@ -0,0 +1,55 @@
"""
Functions for displaying Telegram messages and chats in a formatted way.
"""
from typing import List
from .models import Message, Chat
def print_message(message: Message, show_chat_info: bool = True) -> None:
"""Print a single message with consistent formatting."""
direction = "" if message.is_from_me else ""
if show_chat_info:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction} Chat: {message.chat_title} (ID: {message.chat_id})")
else:
print(f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] {direction}")
print(f"From: {'Me' if message.is_from_me else message.sender_name}")
print(f"Message: {message.content}")
print("-" * 100)
def print_messages_list(messages: List[Message], title: str = "", show_chat_info: bool = True) -> None:
"""Print a list of messages with a title and consistent formatting."""
if not messages:
print("No messages to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for message in messages:
print_message(message, show_chat_info)
def print_chat(chat: Chat) -> None:
"""Print a single chat with consistent formatting."""
print(f"Chat: {chat.title} (ID: {chat.id})")
print(f"Type: {chat.type}")
if chat.username:
print(f"Username: @{chat.username}")
if chat.last_message_time:
print(f"Last active: {chat.last_message_time:%Y-%m-%d %H:%M:%S}")
print("-" * 100)
def print_chats_list(chats: List[Chat], title: str = "") -> None:
"""Print a list of chats with a title and consistent formatting."""
if not chats:
print("No chats to display.")
return
if title:
print(f"\n{title}")
print("-" * 100)
for chat in chats:
print_chat(chat)

View file

@ -0,0 +1,77 @@
"""
Data models for Telegram entities.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
@dataclass
class Message:
"""
Represents a Telegram message with all its metadata.
Attributes:
id: Unique message identifier
chat_id: ID of the chat the message belongs to
chat_title: Title of the chat (user name, group name, etc.)
sender_name: Name of the message sender
content: Text content of the message
timestamp: Date and time when the message was sent
is_from_me: Boolean indicating if the message was sent by the user
sender_id: ID of the message sender
"""
id: int
chat_id: int
chat_title: str
sender_name: str
content: str
timestamp: datetime
is_from_me: bool
sender_id: int
@dataclass
class Chat:
"""
Represents a Telegram chat (direct message, group, channel, etc.).
Attributes:
id: Unique chat identifier
title: Name of the chat (user name, group name, etc.)
username: Optional Telegram username (without @)
type: Type of chat ('user', 'group', 'channel', 'supergroup')
last_message_time: Timestamp of the most recent message in the chat
"""
id: int
title: str
username: Optional[str]
type: str
last_message_time: Optional[datetime]
@dataclass
class Contact:
"""
Represents a Telegram contact.
Attributes:
id: Unique contact identifier
username: Optional Telegram username (without @)
name: Display name of the contact
"""
id: int
username: Optional[str]
name: str
@dataclass
class MessageContext:
"""
Provides context around a specific message.
Attributes:
message: The target message
before: List of messages that came before the target message
after: List of messages that came after the target message
"""
message: Message
before: List[Message]
after: List[Message]