database for telegram interactions

This commit is contained in:
Muhammad18557 2025-04-05 16:14:14 +08:00
parent 922d226719
commit f1299b3d60
4 changed files with 333 additions and 0 deletions

View file

@ -0,0 +1,5 @@
"""
Database module for the Telegram bridge.
Provides ORM models, connection management, and repositories for data access.
"""

View file

@ -0,0 +1,42 @@
"""
Database configuration and session management.
Provides a SQLAlchemy engine and session factory with connection pooling.
"""
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
from database.models import Base
# Get database path
STORE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "store")
os.makedirs(STORE_DIR, exist_ok=True)
DB_PATH = os.path.join(STORE_DIR, "messages.db")
# Create engine with connection pooling
engine = create_engine(
f"sqlite:///{DB_PATH}",
connect_args={"check_same_thread": False}, # Needed for SQLite
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
)
# Create session factory
SessionFactory = sessionmaker(bind=engine)
Session = scoped_session(SessionFactory)
def init_db():
"""Initialize the database schema."""
Base.metadata.create_all(engine)
def get_session():
"""Get a database session from the pool."""
return Session()

View file

@ -0,0 +1,58 @@
"""
Database models for the Telegram bridge.
This module defines SQLAlchemy ORM models for storing Telegram chats and messages.
"""
from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey, Index, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class Chat(Base):
"""Represents a Telegram chat (direct message, group, channel, etc.)."""
__tablename__ = "chats"
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
username = Column(String)
type = Column(String, nullable=False)
last_message_time = Column(DateTime)
# Relationship with messages
messages = relationship("Message", back_populates="chat", cascade="all, delete-orphan")
def __repr__(self):
return f"<Chat(id={self.id}, title='{self.title}', type='{self.type}')>"
class Message(Base):
"""Represents a Telegram message with all its metadata."""
__tablename__ = "messages"
id = Column(Integer, primary_key=True)
chat_id = Column(Integer, ForeignKey("chats.id"), primary_key=True)
sender_id = Column(Integer)
sender_name = Column(String)
content = Column(Text)
timestamp = Column(DateTime, default=datetime.now)
is_from_me = Column(Boolean, default=False)
# Relationship with chat
chat = relationship("Chat", back_populates="messages")
# Indexes for improved query performance
__table_args__ = (
Index("idx_messages_chat_id", "chat_id"),
Index("idx_messages_timestamp", "timestamp"),
Index("idx_messages_content", "content"),
Index("idx_messages_sender_id", "sender_id"),
)
def __repr__(self):
return f"<Message(id={self.id}, chat_id={self.chat_id}, sender='{self.sender_name}')>"

View file

@ -0,0 +1,228 @@
"""
Repository classes for database operations.
Provides abstraction for data access operations on Telegram chats and messages.
"""
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy import desc, or_, and_
from database.base import get_session
from database.models import Chat, Message
class ChatRepository:
"""Repository for chat operations."""
def store_chat(
self,
chat_id: int,
title: str,
username: Optional[str],
chat_type: str,
last_message_time: datetime,
) -> None:
"""Store a chat in the database."""
session = get_session()
try:
chat = session.query(Chat).filter_by(id=chat_id).first()
if chat:
# Update existing chat
chat.title = title
chat.username = username
chat.type = chat_type
chat.last_message_time = last_message_time
else:
# Create new chat
chat = Chat(
id=chat_id,
title=title,
username=username,
type=chat_type,
last_message_time=last_message_time
)
session.add(chat)
session.commit()
finally:
session.close()
def get_chats(
self,
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chat_type: Optional[str] = None,
sort_by: str = "last_message_time"
) -> List[Chat]:
"""Get chats from the database."""
session = get_session()
try:
# Build query
db_query = session.query(Chat)
# Apply filters
if query:
db_query = db_query.filter(
or_(
Chat.title.ilike(f"%{query}%"),
Chat.username.ilike(f"%{query}%")
)
)
if chat_type:
db_query = db_query.filter(Chat.type == chat_type)
# Apply sorting
if sort_by == "last_message_time":
db_query = db_query.order_by(desc(Chat.last_message_time))
else:
db_query = db_query.order_by(Chat.title)
# Apply pagination
db_query = db_query.limit(limit).offset(offset)
return db_query.all()
finally:
session.close()
def get_chat_by_id(self, chat_id: int) -> Optional[Chat]:
"""Get a chat by its ID."""
session = get_session()
try:
return session.query(Chat).filter_by(id=chat_id).first()
finally:
session.close()
class MessageRepository:
"""Repository for message operations."""
def store_message(
self,
message_id: int,
chat_id: int,
sender_id: int,
sender_name: str,
content: str,
timestamp: datetime,
is_from_me: bool,
) -> None:
"""Store a message in the database."""
if not content: # Skip empty messages
return
session = get_session()
try:
message = session.query(Message).filter_by(
id=message_id, chat_id=chat_id
).first()
if message:
# Update existing message
message.sender_id = sender_id
message.sender_name = sender_name
message.content = content
message.timestamp = timestamp
message.is_from_me = is_from_me
else:
# Create new message
message = Message(
id=message_id,
chat_id=chat_id,
sender_id=sender_id,
sender_name=sender_name,
content=content,
timestamp=timestamp,
is_from_me=is_from_me
)
session.add(message)
session.commit()
finally:
session.close()
def get_messages(
self,
chat_id: Optional[int] = None,
sender_id: Optional[int] = None,
query: Optional[str] = None,
limit: int = 50,
offset: int = 0,
date_range: Optional[Tuple[datetime, datetime]] = None,
) -> List[Message]:
"""Get messages from the database."""
session = get_session()
try:
# Build query
db_query = session.query(Message).join(Chat)
# Apply filters
filters = []
if chat_id:
filters.append(Message.chat_id == chat_id)
if sender_id:
filters.append(Message.sender_id == sender_id)
if query:
filters.append(Message.content.ilike(f"%{query}%"))
if date_range:
start_date, end_date = date_range
filters.append(and_(
Message.timestamp >= start_date,
Message.timestamp <= end_date
))
if filters:
db_query = db_query.filter(and_(*filters))
# Apply sorting and pagination
db_query = db_query.order_by(desc(Message.timestamp))
db_query = db_query.limit(limit).offset(offset)
return db_query.all()
finally:
session.close()
def get_message_context(
self,
message_id: int,
chat_id: int,
before: int = 5,
after: int = 5
) -> Dict[str, Any]:
"""Get context around a specific message."""
session = get_session()
try:
# Get the target message
target_message = session.query(Message).filter_by(
id=message_id, chat_id=chat_id
).first()
if not target_message:
raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found")
# Get messages before
before_messages = session.query(Message).filter(
Message.chat_id == chat_id,
Message.timestamp < target_message.timestamp
).order_by(desc(Message.timestamp)).limit(before).all()
# Get messages after
after_messages = session.query(Message).filter(
Message.chat_id == chat_id,
Message.timestamp > target_message.timestamp
).order_by(Message.timestamp).limit(after).all()
return {
"message": target_message,
"before": before_messages,
"after": after_messages
}
finally:
session.close()