""" Database operations for retrieving and managing Telegram data. """ import sqlite3 from datetime import datetime from typing import Optional, List, Tuple from . import MESSAGES_DB_PATH from .models import Message, Chat, Contact, MessageContext def search_contacts(query: str) -> List[Contact]: """Search contacts by name or username.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() # Search in chats where type is 'user' cursor.execute(""" SELECT id, username, title FROM chats WHERE type = 'user' AND (title LIKE ? OR username LIKE ?) ORDER BY title LIMIT 50 """, (f"%{query}%", f"%{query}%")) contacts = cursor.fetchall() result = [] for contact_data in contacts: contact = Contact( id=contact_data[0], username=contact_data[1], name=contact_data[2] ) result.append(contact) return result except sqlite3.Error as e: print(f"Database error: {e}") return [] finally: if 'conn' in locals(): conn.close() def list_messages( date_range: Optional[Tuple[datetime, datetime]] = None, sender_id: Optional[int] = None, chat_id: Optional[int] = None, query: Optional[str] = None, limit: int = 20, page: int = 0, include_context: bool = True, context_before: int = 1, context_after: int = 1 ) -> List[Message]: """Get messages matching the specified criteria with optional context.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() # Build base query query_parts = [""" SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m """] query_parts.append("JOIN chats c ON m.chat_id = c.id") where_clauses = [] params = [] # Add filters if date_range: where_clauses.append("m.timestamp BETWEEN ? AND ?") params.extend([date_range[0].isoformat(), date_range[1].isoformat()]) if sender_id: where_clauses.append("m.sender_id = ?") params.append(sender_id) if chat_id: where_clauses.append("m.chat_id = ?") params.append(chat_id) if query: where_clauses.append("LOWER(m.content) LIKE LOWER(?)") params.append(f"%{query}%") if where_clauses: query_parts.append("WHERE " + " AND ".join(where_clauses)) # Add pagination offset = page * limit query_parts.append("ORDER BY m.timestamp DESC") query_parts.append("LIMIT ? OFFSET ?") params.extend([limit, offset]) cursor.execute(" ".join(query_parts), tuple(params)) messages = cursor.fetchall() result = [] for msg in messages: message = Message( id=msg[0], chat_id=msg[1], chat_title=msg[2], sender_name=msg[3], content=msg[4], timestamp=datetime.fromisoformat(msg[5]), is_from_me=bool(msg[6]), sender_id=msg[7] ) result.append(message) if include_context and result: # Add context for each message messages_with_context = [] for msg in result: context = get_message_context(msg.id, msg.chat_id, context_before, context_after) messages_with_context.extend(context.before) messages_with_context.append(context.message) messages_with_context.extend(context.after) return messages_with_context return result except sqlite3.Error as e: print(f"Database error: {e}") return [] finally: if 'conn' in locals(): conn.close() def get_message_context( message_id: int, chat_id: int, before: int = 5, after: int = 5 ) -> MessageContext: """Get context around a specific message.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() # Get the target message first cursor.execute(""" SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m JOIN chats c ON m.chat_id = c.id WHERE m.id = ? AND m.chat_id = ? """, (message_id, chat_id)) msg_data = cursor.fetchone() if not msg_data: raise ValueError(f"Message with ID {message_id} in chat {chat_id} not found") target_message = Message( id=msg_data[0], chat_id=msg_data[1], chat_title=msg_data[2], sender_name=msg_data[3], content=msg_data[4], timestamp=datetime.fromisoformat(msg_data[5]), is_from_me=bool(msg_data[6]), sender_id=msg_data[7] ) # Get messages before cursor.execute(""" SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m JOIN chats c ON m.chat_id = c.id WHERE m.chat_id = ? AND m.timestamp < ? ORDER BY m.timestamp DESC LIMIT ? """, (chat_id, target_message.timestamp.isoformat(), before)) before_messages = [] for msg in cursor.fetchall(): before_messages.append(Message( id=msg[0], chat_id=msg[1], chat_title=msg[2], sender_name=msg[3], content=msg[4], timestamp=datetime.fromisoformat(msg[5]), is_from_me=bool(msg[6]), sender_id=msg[7] )) # Get messages after cursor.execute(""" SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m JOIN chats c ON m.chat_id = c.id WHERE m.chat_id = ? AND m.timestamp > ? ORDER BY m.timestamp ASC LIMIT ? """, (chat_id, target_message.timestamp.isoformat(), after)) after_messages = [] for msg in cursor.fetchall(): after_messages.append(Message( id=msg[0], chat_id=msg[1], chat_title=msg[2], sender_name=msg[3], content=msg[4], timestamp=datetime.fromisoformat(msg[5]), is_from_me=bool(msg[6]), sender_id=msg[7] )) return MessageContext( message=target_message, before=before_messages, after=after_messages ) except sqlite3.Error as e: print(f"Database error: {e}") raise finally: if 'conn' in locals(): conn.close() def list_chats( query: Optional[str] = None, limit: int = 20, page: int = 0, chat_type: Optional[str] = None, sort_by: str = "last_active" ) -> List[Chat]: """Get chats matching the specified criteria.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() # Build base query query_parts = ["SELECT id, title, username, type, last_message_time FROM chats"] where_clauses = [] params = [] if query: where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(username) LIKE LOWER(?))") params.extend([f"%{query}%", f"%{query}%"]) if chat_type: where_clauses.append("type = ?") params.append(chat_type) if where_clauses: query_parts.append("WHERE " + " AND ".join(where_clauses)) # Add sorting order_by = "last_message_time DESC" if sort_by == "last_active" else "title" query_parts.append(f"ORDER BY {order_by}") # Add pagination offset = (page) * limit query_parts.append("LIMIT ? OFFSET ?") params.extend([limit, offset]) cursor.execute(" ".join(query_parts), tuple(params)) chats = cursor.fetchall() result = [] for chat_data in chats: last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None chat = Chat( id=chat_data[0], title=chat_data[1], username=chat_data[2], type=chat_data[3], last_message_time=last_message_time ) result.append(chat) return result except sqlite3.Error as e: print(f"Database error: {e}") return [] finally: if 'conn' in locals(): conn.close() def get_chat(chat_id: int) -> Optional[Chat]: """Get chat metadata by ID.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT id, title, username, type, last_message_time FROM chats WHERE id = ? """, (chat_id,)) chat_data = cursor.fetchone() if not chat_data: return None last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None return Chat( id=chat_data[0], title=chat_data[1], username=chat_data[2], type=chat_data[3], last_message_time=last_message_time ) except sqlite3.Error as e: print(f"Database error: {e}") return None finally: if 'conn' in locals(): conn.close() def get_direct_chat_by_contact(contact_id: int) -> Optional[Chat]: """Get direct chat metadata by contact ID.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT id, title, username, type, last_message_time FROM chats WHERE id = ? AND type = 'user' """, (contact_id,)) chat_data = cursor.fetchone() if not chat_data: return None last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None return Chat( id=chat_data[0], title=chat_data[1], username=chat_data[2], type=chat_data[3], last_message_time=last_message_time ) except sqlite3.Error as e: print(f"Database error: {e}") return None finally: if 'conn' in locals(): conn.close() def get_contact_chats(contact_id: int, limit: int = 20, page: int = 0) -> List[Chat]: """Get all chats involving the contact. Args: contact_id: The contact's ID to search for limit: Maximum number of chats to return (default 20) page: Page number for pagination (default 0) """ try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT c.id, c.title, c.username, c.type, c.last_message_time FROM chats c JOIN messages m ON c.id = m.chat_id WHERE m.sender_id = ? OR c.id = ? ORDER BY c.last_message_time DESC LIMIT ? OFFSET ? """, (contact_id, contact_id, limit, page * limit)) chats = cursor.fetchall() result = [] for chat_data in chats: last_message_time = datetime.fromisoformat(chat_data[4]) if chat_data[4] else None chat = Chat( id=chat_data[0], title=chat_data[1], username=chat_data[2], type=chat_data[3], last_message_time=last_message_time ) result.append(chat) return result except sqlite3.Error as e: print(f"Database error: {e}") return [] finally: if 'conn' in locals(): conn.close() def get_last_interaction(contact_id: int) -> Optional[Message]: """Get most recent message involving the contact.""" try: conn = sqlite3.connect(MESSAGES_DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT m.id, m.chat_id, c.title, m.sender_name, m.content, m.timestamp, m.is_from_me, m.sender_id FROM messages m JOIN chats c ON m.chat_id = c.id WHERE m.sender_id = ? OR c.id = ? ORDER BY m.timestamp DESC LIMIT 1 """, (contact_id, contact_id)) msg_data = cursor.fetchone() if not msg_data: return None return Message( id=msg_data[0], chat_id=msg_data[1], chat_title=msg_data[2], sender_name=msg_data[3], content=msg_data[4], timestamp=datetime.fromisoformat(msg_data[5]), is_from_me=bool(msg_data[6]), sender_id=msg_data[7] ) except sqlite3.Error as e: print(f"Database error: {e}") return None finally: if 'conn' in locals(): conn.close()