From f1299b3d60fe6753dd214456d44ebb9d32672c9f Mon Sep 17 00:00:00 2001 From: Muhammad18557 Date: Sat, 5 Apr 2025 16:14:14 +0800 Subject: [PATCH] database for telegram interactions --- telegram-bridge/database/__init__.py | 5 + telegram-bridge/database/base.py | 42 +++++ telegram-bridge/database/models.py | 58 ++++++ telegram-bridge/database/repositories.py | 228 +++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 telegram-bridge/database/__init__.py create mode 100644 telegram-bridge/database/base.py create mode 100644 telegram-bridge/database/models.py create mode 100644 telegram-bridge/database/repositories.py diff --git a/telegram-bridge/database/__init__.py b/telegram-bridge/database/__init__.py new file mode 100644 index 0000000..4e00d85 --- /dev/null +++ b/telegram-bridge/database/__init__.py @@ -0,0 +1,5 @@ +""" +Database module for the Telegram bridge. + +Provides ORM models, connection management, and repositories for data access. +""" \ No newline at end of file diff --git a/telegram-bridge/database/base.py b/telegram-bridge/database/base.py new file mode 100644 index 0000000..e27ef4e --- /dev/null +++ b/telegram-bridge/database/base.py @@ -0,0 +1,42 @@ +""" +Database configuration and session management. + +Provides a SQLAlchemy engine and session factory with connection pooling. +""" + +import os +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.pool import QueuePool + +from database.models import Base + +# Get database path +STORE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "store") +os.makedirs(STORE_DIR, exist_ok=True) +DB_PATH = os.path.join(STORE_DIR, "messages.db") + +# Create engine with connection pooling +engine = create_engine( + f"sqlite:///{DB_PATH}", + connect_args={"check_same_thread": False}, # Needed for SQLite + poolclass=QueuePool, + pool_size=5, + max_overflow=10, + pool_timeout=30, + pool_recycle=3600, +) + +# Create session factory +SessionFactory = sessionmaker(bind=engine) +Session = scoped_session(SessionFactory) + + +def init_db(): + """Initialize the database schema.""" + Base.metadata.create_all(engine) + + +def get_session(): + """Get a database session from the pool.""" + return Session() \ No newline at end of file diff --git a/telegram-bridge/database/models.py b/telegram-bridge/database/models.py new file mode 100644 index 0000000..357599d --- /dev/null +++ b/telegram-bridge/database/models.py @@ -0,0 +1,58 @@ +""" +Database models for the Telegram bridge. + +This module defines SQLAlchemy ORM models for storing Telegram chats and messages. +""" + +from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey, Index, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from datetime import datetime + +Base = declarative_base() + + +class Chat(Base): + """Represents a Telegram chat (direct message, group, channel, etc.).""" + + __tablename__ = "chats" + + id = Column(Integer, primary_key=True) + title = Column(String, nullable=False) + username = Column(String) + type = Column(String, nullable=False) + last_message_time = Column(DateTime) + + # Relationship with messages + messages = relationship("Message", back_populates="chat", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + + +class Message(Base): + """Represents a Telegram message with all its metadata.""" + + __tablename__ = "messages" + + id = Column(Integer, primary_key=True) + chat_id = Column(Integer, ForeignKey("chats.id"), primary_key=True) + sender_id = Column(Integer) + sender_name = Column(String) + content = Column(Text) + timestamp = Column(DateTime, default=datetime.now) + is_from_me = Column(Boolean, default=False) + + # Relationship with chat + chat = relationship("Chat", back_populates="messages") + + # Indexes for improved query performance + __table_args__ = ( + Index("idx_messages_chat_id", "chat_id"), + Index("idx_messages_timestamp", "timestamp"), + Index("idx_messages_content", "content"), + Index("idx_messages_sender_id", "sender_id"), + ) + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/telegram-bridge/database/repositories.py b/telegram-bridge/database/repositories.py new file mode 100644 index 0000000..c2b7e1b --- /dev/null +++ b/telegram-bridge/database/repositories.py @@ -0,0 +1,228 @@ +""" +Repository classes for database operations. + +Provides abstraction for data access operations on Telegram chats and messages. +""" + +from datetime import datetime +from typing import List, Optional, Dict, Any, Tuple +from sqlalchemy import desc, or_, and_ + +from database.base import get_session +from database.models import Chat, Message + + +class ChatRepository: + """Repository for chat operations.""" + + def store_chat( + self, + chat_id: int, + title: str, + username: Optional[str], + chat_type: str, + last_message_time: datetime, + ) -> None: + """Store a chat in the database.""" + session = get_session() + try: + chat = session.query(Chat).filter_by(id=chat_id).first() + + if chat: + # Update existing chat + chat.title = title + chat.username = username + chat.type = chat_type + chat.last_message_time = last_message_time + else: + # Create new chat + chat = Chat( + id=chat_id, + title=title, + username=username, + type=chat_type, + last_message_time=last_message_time + ) + session.add(chat) + + session.commit() + finally: + session.close() + + def get_chats( + self, + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + chat_type: Optional[str] = None, + sort_by: str = "last_message_time" + ) -> List[Chat]: + """Get chats from the database.""" + session = get_session() + try: + # Build query + db_query = session.query(Chat) + + # Apply filters + if query: + db_query = db_query.filter( + or_( + Chat.title.ilike(f"%{query}%"), + Chat.username.ilike(f"%{query}%") + ) + ) + + if chat_type: + db_query = db_query.filter(Chat.type == chat_type) + + # Apply sorting + if sort_by == "last_message_time": + db_query = db_query.order_by(desc(Chat.last_message_time)) + else: + db_query = db_query.order_by(Chat.title) + + # Apply pagination + db_query = db_query.limit(limit).offset(offset) + + return db_query.all() + finally: + session.close() + + def get_chat_by_id(self, chat_id: int) -> Optional[Chat]: + """Get a chat by its ID.""" + session = get_session() + try: + return session.query(Chat).filter_by(id=chat_id).first() + finally: + session.close() + + +class MessageRepository: + """Repository for message operations.""" + + def store_message( + self, + message_id: int, + chat_id: int, + sender_id: int, + sender_name: str, + content: str, + timestamp: datetime, + is_from_me: bool, + ) -> None: + """Store a message in the database.""" + if not content: # Skip empty messages + return + + session = get_session() + try: + message = session.query(Message).filter_by( + id=message_id, chat_id=chat_id + ).first() + + if message: + # Update existing message + message.sender_id = sender_id + message.sender_name = sender_name + message.content = content + message.timestamp = timestamp + message.is_from_me = is_from_me + else: + # Create new message + message = Message( + id=message_id, + chat_id=chat_id, + sender_id=sender_id, + sender_name=sender_name, + content=content, + timestamp=timestamp, + is_from_me=is_from_me + ) + session.add(message) + + session.commit() + finally: + session.close() + + def get_messages( + self, + chat_id: Optional[int] = None, + sender_id: Optional[int] = None, + query: Optional[str] = None, + limit: int = 50, + offset: int = 0, + date_range: Optional[Tuple[datetime, datetime]] = None, + ) -> List[Message]: + """Get messages from the database.""" + session = get_session() + try: + # Build query + db_query = session.query(Message).join(Chat) + + # Apply filters + filters = [] + + if chat_id: + filters.append(Message.chat_id == chat_id) + + if sender_id: + filters.append(Message.sender_id == sender_id) + + if query: + filters.append(Message.content.ilike(f"%{query}%")) + + if date_range: + start_date, end_date = date_range + filters.append(and_( + Message.timestamp >= start_date, + Message.timestamp <= end_date + )) + + if filters: + db_query = db_query.filter(and_(*filters)) + + # Apply sorting and pagination + db_query = db_query.order_by(desc(Message.timestamp)) + db_query = db_query.limit(limit).offset(offset) + + return db_query.all() + finally: + session.close() + + def get_message_context( + self, + message_id: int, + chat_id: int, + before: int = 5, + after: int = 5 + ) -> Dict[str, Any]: + """Get context around a specific message.""" + session = get_session() + try: + # Get the target message + target_message = session.query(Message).filter_by( + id=message_id, chat_id=chat_id + ).first() + + if not target_message: + raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found") + + # Get messages before + before_messages = session.query(Message).filter( + Message.chat_id == chat_id, + Message.timestamp < target_message.timestamp + ).order_by(desc(Message.timestamp)).limit(before).all() + + # Get messages after + after_messages = session.query(Message).filter( + Message.chat_id == chat_id, + Message.timestamp > target_message.timestamp + ).order_by(Message.timestamp).limit(after).all() + + return { + "message": target_message, + "before": before_messages, + "after": after_messages + } + finally: + session.close() \ No newline at end of file