Spaces:
Runtime error
Runtime error
| # main.py - Updated Version with All Fixes | |
| from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File, Form, Header, BackgroundTasks | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, EmailStr | |
| from typing import Optional, List | |
| from datetime import datetime, timedelta, timezone | |
| import jwt | |
| import bcrypt | |
| import sqlite3 | |
| import os | |
| import uuid | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| import sys | |
| from dotenv import load_dotenv | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import secrets | |
| import httpx | |
| import requests | |
| # In-memory storage for OTPs (for MVP) | |
| otp_storage = {} | |
| # Setup paths | |
| from process_pdf import process_new_pdf | |
| # Thread pool for Qdrant operations | |
| executor = ThreadPoolExecutor(max_workers=3) | |
| sys.stdout.reconfigure(encoding='utf-8') | |
| # Load environment variables | |
| load_dotenv() | |
| # ======================= | |
| # Configuration | |
| # ======================= | |
| SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production") | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours | |
| UPLOAD_DIR = "uploads" | |
| LECTURES_DIR = "lectures" | |
| DB_PATH = "university_chatbot.db" | |
| # Email Configuration | |
| BREVO_API_KEY = os.getenv("BREVO_API_KEY") | |
| SENDER_EMAIL = "universityai.com@gmail.com" | |
| app = FastAPI(title="University AI Chatbot API with Courses") | |
| # OAuth2 Scheme | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login", auto_error=False) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Serve only the chat image securely instead of the whole backend folder | |
| async def get_chat_image(): | |
| return FileResponse("ch.png") | |
| # ======================= | |
| # Database Setup with Auto-Migration | |
| # ======================= | |
| def init_db(): | |
| # Ensure data directory exists to prevent crash | |
| db_dir = os.path.dirname(DB_PATH) | |
| if db_dir: | |
| os.makedirs(db_dir, exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| print("\n" + "="*60) | |
| print("π Initializing Database...") | |
| print("="*60 + "\n") | |
| # Users table | |
| c.execute('''CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| email TEXT NOT NULL UNIQUE, | |
| password_hash TEXT NOT NULL, | |
| role TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )''') | |
| # Courses table | |
| c.execute('''CREATE TABLE IF NOT EXISTS courses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT UNIQUE NOT NULL, | |
| description TEXT, | |
| admin_id INTEGER NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (admin_id) REFERENCES users(id) | |
| )''') | |
| # Conversations table | |
| c.execute('''CREATE TABLE IF NOT EXISTS conversations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| title TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| is_deleted INTEGER DEFAULT 0, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )''') | |
| # Messages table - NOW WITH BOTH sender AND role | |
| c.execute('''CREATE TABLE IF NOT EXISTS messages ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| conversation_id INTEGER NOT NULL, | |
| sender TEXT NOT NULL, | |
| role TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (conversation_id) REFERENCES conversations(id) | |
| )''') | |
| # Feedbacks table | |
| c.execute('''CREATE TABLE IF NOT EXISTS feedbacks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| message_id INTEGER NOT NULL, | |
| user_id INTEGER NOT NULL, | |
| feedback_type TEXT NOT NULL, | |
| comment TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE, | |
| FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, | |
| UNIQUE(message_id, user_id) | |
| )''') | |
| # Files table | |
| c.execute('''CREATE TABLE IF NOT EXISTS files ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| filename TEXT NOT NULL, | |
| filepath TEXT NOT NULL, | |
| file_type TEXT NOT NULL, | |
| subject TEXT, | |
| uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )''') | |
| # Lectures table | |
| c.execute('''CREATE TABLE IF NOT EXISTS lectures ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| admin_id INTEGER NOT NULL, | |
| filename TEXT NOT NULL, | |
| filepath TEXT NOT NULL, | |
| subject TEXT, | |
| uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| processing_status TEXT DEFAULT 'pending', | |
| total_chunks INTEGER DEFAULT 0, | |
| total_characters INTEGER DEFAULT 0, | |
| error_message TEXT, | |
| FOREIGN KEY (admin_id) REFERENCES users(id) | |
| )''') | |
| # Password reset tokens table | |
| c.execute('''CREATE TABLE IF NOT EXISTS password_reset_tokens ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| token TEXT UNIQUE NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| expires_at TEXT NOT NULL, | |
| used INTEGER DEFAULT 0, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )''') | |
| conn.commit() | |
| # π§ AUTO-MIGRATION - Add missing columns | |
| print("π Checking for missing columns...") | |
| # π₯ CRITICAL FIX: Add 'role' column to messages table | |
| c.execute("PRAGMA table_info(messages)") | |
| message_columns = {col[1] for col in c.fetchall()} | |
| if 'role' not in message_columns: | |
| try: | |
| print(" π§ Adding 'role' column to messages table...") | |
| c.execute("ALTER TABLE messages ADD COLUMN role TEXT DEFAULT 'user'") | |
| # Migrate existing data: map sender -> role | |
| c.execute("UPDATE messages SET role = CASE WHEN sender = 'ai' THEN 'assistant' ELSE 'user' END") | |
| conn.commit() | |
| print(" β Added column: role to messages (migrated existing data)") | |
| except sqlite3.OperationalError as e: | |
| if "duplicate column" not in str(e).lower(): | |
| print(f" β οΈ Error adding role: {e}") | |
| else: | |
| # Ensure existing role data is correct | |
| try: | |
| c.execute("UPDATE messages SET role = CASE WHEN sender = 'ai' THEN 'assistant' ELSE 'user' END WHERE role IS NULL OR role = ''") | |
| conn.commit() | |
| print(" β Role column exists and data verified") | |
| except Exception as e: | |
| print(f" β οΈ Error verifying role data: {e}") | |
| # Check lectures table columns | |
| c.execute("PRAGMA table_info(lectures)") | |
| existing_columns = {col[1] for col in c.fetchall()} | |
| required_columns = { | |
| 'processing_status': "TEXT DEFAULT 'pending'", | |
| 'total_chunks': "INTEGER DEFAULT 0", | |
| 'total_characters': "INTEGER DEFAULT 0", | |
| 'error_message': "TEXT" | |
| } | |
| for col_name, col_type in required_columns.items(): | |
| if col_name not in existing_columns: | |
| try: | |
| c.execute(f"ALTER TABLE lectures ADD COLUMN {col_name} {col_type}") | |
| conn.commit() | |
| print(f" β Added column: {col_name}") | |
| except sqlite3.OperationalError as e: | |
| if "duplicate column" not in str(e).lower(): | |
| print(f" β οΈ Error adding {col_name}: {e}") | |
| # Check conversations table for is_deleted | |
| c.execute("PRAGMA table_info(conversations)") | |
| conv_columns = {col[1] for col in c.fetchall()} | |
| if 'is_deleted' not in conv_columns: | |
| try: | |
| c.execute("ALTER TABLE conversations ADD COLUMN is_deleted INTEGER DEFAULT 0") | |
| conn.commit() | |
| print(" β Added column: is_deleted to conversations") | |
| except Exception as e: | |
| print(f" β οΈ Error adding is_deleted: {e}") | |
| # Seed admin user | |
| admin_email = "admin@university.edu" | |
| admin_password = "Admin123" | |
| hashed = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| try: | |
| c.execute("SELECT id FROM users WHERE email = ?", (admin_email,)) | |
| existing_admin = c.fetchone() | |
| if not existing_admin: | |
| c.execute("INSERT INTO users (email, password_hash, role) VALUES (?, ?, ?)", | |
| (admin_email, hashed, 'admin')) | |
| conn.commit() | |
| admin_id = c.lastrowid | |
| print(f"\nβ Admin user created: {admin_email}") | |
| print(f" Password: {admin_password}") | |
| else: | |
| admin_id = existing_admin[0] | |
| print(f"\nβΉοΈ Admin user already exists: {admin_email}") | |
| # Seed 12 Fixed Courses | |
| fixed_courses = [ | |
| ("Android Development", "Basics of CS and programming"), | |
| ("Computer Networks", "Fundamental data structures and algorithms"), | |
| ("Information Security", "SQL, NoSQL, and database design"), | |
| ("Operating Systems", "Process management, memory, and concurrency"), | |
| ("Theory of Computation", "OSI model, TCP/IP, and network security"), | |
| ("Algorithms Design and Analysis", "SDLC, agile, and design patterns"), | |
| ("Computer Architecture", "Search, logic, and probabilistic reasoning"), | |
| ("Machine Learning", "Supervised and unsupervised learning"), | |
| ("Compiler Design", "HTML, CSS, JavaScript, and backend frameworks"), | |
| ("Computer Graphics", "Network security, cryptography, and ethical hacking"), | |
| ("Human Computer Interaction", "AWS, Azure, and cloud architecture") | |
| ] | |
| print("\nπ± Seeding fixed courses...") | |
| for name, desc in fixed_courses: | |
| c.execute("SELECT id FROM courses WHERE name = ?", (name,)) | |
| if not c.fetchone(): | |
| c.execute("INSERT INTO courses (name, description, admin_id) VALUES (?, ?, ?)", | |
| (name, desc, admin_id)) | |
| print(f" β Added course: {name}") | |
| conn.commit() | |
| except Exception as e: | |
| print(f"β Error seeding data: {e}") | |
| conn.close() | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(LECTURES_DIR, exist_ok=True) | |
| print("\n" + "="*60) | |
| print("β Database initialization completed!") | |
| print("="*60 + "\n") | |
| init_db() | |
| # ======================= | |
| # Helper Functions | |
| # ======================= | |
| def get_db(): | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def create_access_token(data: dict) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| to_encode.update({"exp": expire}) | |
| token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return token | |
| def verify_token(token: str): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| return payload | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException(status_code=401, detail="Token expired") | |
| except jwt.InvalidTokenError: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| def get_current_user( | |
| authorization: Optional[str] = Header(None), | |
| token: Optional[str] = Depends(oauth2_scheme) | |
| ): | |
| raw_token = None | |
| if authorization and authorization.startswith("Bearer "): | |
| raw_token = authorization.split(" ")[1] | |
| elif token: | |
| raw_token = token | |
| if not raw_token: | |
| raise HTTPException(status_code=401, detail="Authentication required") | |
| payload = verify_token(raw_token) | |
| user_id = payload.get("user_id") | |
| role = payload.get("role") | |
| if not user_id: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| return {"user_id": user_id, "role": role} | |
| def send_email(to_email: str, subject: str, html_content: str): | |
| """Sends an email using Brevo API (HTTP POST).""" | |
| if not BREVO_API_KEY: | |
| print("β οΈ Email configuration missing (BREVO_API_KEY). Skipping email.") | |
| return | |
| if not BREVO_API_KEY.startswith("xkeysib-"): | |
| print("β οΈ Error: The provided BREVO_API_KEY looks like an SMTP password (starts with 'xsmtpsib').") | |
| print("π Please generate a new API Key from Brevo Dashboard -> SMTP & API -> API Keys (it should start with 'xkeysib').") | |
| return | |
| endpoint = "https://api.brevo.com/v3/smtp/email" | |
| headers = { | |
| "api-key": BREVO_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "sender": {"name": "University AI", "email": SENDER_EMAIL}, | |
| "to": [{"email": to_email}], | |
| "subject": subject, | |
| "htmlContent": html_content | |
| } | |
| try: | |
| response = requests.post(endpoint, json=payload, headers=headers, timeout=10) | |
| if response.status_code == 201 or response.status_code == 200: | |
| print(f"β Transaction Successful: Email queued for {to_email}") | |
| else: | |
| print(f"β API Error {response.status_code}: {response.text}") | |
| except Exception as e: | |
| print(f"β οΈ Infrastructure Failure: {str(e)}") | |
| # ======================= | |
| # Pydantic Models | |
| # ======================= | |
| class UserRegister(BaseModel): | |
| email: EmailStr | |
| password: str | |
| class ForgotPasswordRequest(BaseModel): | |
| email: EmailStr | |
| class ResetPasswordRequest(BaseModel): | |
| email: EmailStr | |
| token: str | |
| new_password: str | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str | |
| role: str | |
| class ChatMessage(BaseModel): | |
| conversation_id: Optional[int] = None | |
| message: str | |
| class FeedbackRequest(BaseModel): | |
| message_id: int | |
| feedback_type: str | |
| comment: Optional[str] = None | |
| class CourseCreate(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| # ======================= | |
| # Root Endpoint | |
| # ======================= | |
| # Serve the Login page by default | |
| async def root(): | |
| return FileResponse("index.html") | |
| # Serve HTML Pages | |
| async def index_page(): | |
| return FileResponse("index.html") | |
| async def login_page(): | |
| return FileResponse("login.html") | |
| async def chat_page(): | |
| return FileResponse("chat.html") | |
| async def admin_page(): | |
| return FileResponse("Admin-Dashboard.html") | |
| async def register_page(): | |
| return FileResponse("register.html") | |
| async def forgot_password_page(): | |
| return FileResponse("forgot-password.html") | |
| async def reset_password_page(): | |
| return FileResponse("reset-password.html") | |
| async def verify_email_page(): | |
| return FileResponse("verify-email.html") | |
| # ======================= | |
| # Auth Endpoints | |
| # ======================= | |
| async def register(background_tasks: BackgroundTasks, email: str = Form(...), password: str = Form(...)): | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT id FROM users WHERE email = ?", (email,)) | |
| if c.fetchone(): | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() | |
| c.execute("INSERT INTO users (email, password_hash, role) VALUES (?, ?, ?)", | |
| (email, hashed_pw, "student")) | |
| conn.commit() | |
| conn.close() | |
| # Send Welcome Email | |
| subject = "Welcome to University AI! π" | |
| html_content = f""" | |
| <html><body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;"> | |
| <div style="max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #ddd; border-radius: 10px;"> | |
| <h2 style="color: #3A662A; text-align: center;">Welcome to University AI! π</h2> | |
| <p>Hi there,</p> | |
| <p>Thank you for joining University AI. We are excited to have you on board!</p> | |
| <p>You can now log in and start chatting with our AI assistant.</p> | |
| <hr style="border: none; border-top: 1px solid #ddd; margin: 20px 0;"> | |
| <p style="font-size: 12px; color: #999; text-align: center;">University AI Team<br>Your Learning Partner</p> | |
| </div> | |
| </body></html>""" | |
| background_tasks.add_task(send_email, email, subject, html_content) | |
| return { | |
| "message": "Account created successfully. Redirecting to login...", | |
| "email": email | |
| } | |
| async def login(data: UserRegister): | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT * FROM users WHERE email = ?", (data.email,)) | |
| user = c.fetchone() | |
| conn.close() | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| if not bcrypt.checkpw(data.password.encode(), user["password_hash"].encode()): | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| access = create_access_token({ | |
| "user_id": user["id"], | |
| "role": user["role"] | |
| }) | |
| print(f"β User logged in: {data.email} ({user['role']})") | |
| return { | |
| "access_token": access, | |
| "token_type": "bearer", | |
| "role": user["role"] | |
| } | |
| async def forgot_password(request: ForgotPasswordRequest): | |
| """Handles forgot password request""" | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT id FROM users WHERE email = ?", (request.email,)) | |
| user = c.fetchone() | |
| conn.close() | |
| if user: | |
| otp = f"{secrets.randbelow(1000000):06d}" | |
| otp_storage[request.email] = { | |
| "code": otp, | |
| "timestamp": datetime.now(timezone.utc) | |
| } | |
| subject = "Reset Your Password - University AI π" | |
| html_content = f""" | |
| <html><body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;"> | |
| <div style="max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #ddd; border-radius: 10px;"> | |
| <h2 style="color: #3A662A; text-align: center;">Password Reset Request</h2> | |
| <p>Hi,</p> | |
| <p>We received a request to reset your password. Use this code:</p> | |
| <div style="background: #f5f5f5; padding: 20px; text-align: center; border-radius: 8px; margin: 20px 0;"> | |
| <h1 style="letter-spacing: 8px; font-size: 36px; margin: 10px 0; color: #3A662A;">{otp}</h1> | |
| <p style="margin: 10px 0 0 0; font-size: 12px; color: #999;">Expires in 5 minutes</p> | |
| </div> | |
| <p style="font-size: 14px; color: #666;">If you didn't request this, ignore this email.</p> | |
| <p style="font-size: 12px; color: #999; text-align: center;">University AI Security Team</p> | |
| </div> | |
| </body></html>""" | |
| send_email(request.email, subject, html_content) | |
| print(f"π Password reset code sent to: {request.email}") | |
| return {"message": "If account exists, password reset code has been sent."} | |
| async def reset_password(request: ResetPasswordRequest): | |
| """Resets user password""" | |
| conn = get_db() | |
| c = conn.cursor() | |
| # Verify OTP | |
| stored_otp = otp_storage.get(request.email) | |
| if not stored_otp or stored_otp["code"] != request.token: | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="Invalid or incorrect code.") | |
| # Check TTL | |
| if datetime.now(timezone.utc) - stored_otp["timestamp"] > timedelta(minutes=5): | |
| del otp_storage[request.email] | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="Code has expired.") | |
| # Burn OTP | |
| del otp_storage[request.email] | |
| c.execute("SELECT id FROM users WHERE email = ?", (request.email,)) | |
| user = c.fetchone() | |
| if not user: | |
| conn.close() | |
| raise HTTPException(status_code=404, detail="User not found.") | |
| new_hashed_pw = bcrypt.hashpw(request.new_password.encode(), bcrypt.gensalt()).decode() | |
| c.execute("UPDATE users SET password_hash = ? WHERE id = ?", (new_hashed_pw, user["id"])) | |
| conn.commit() | |
| conn.close() | |
| print(f"π Password reset successful for: {request.email}") | |
| return {"message": "Password reset successful. You can now login with your new password."} | |
| # ======================= | |
| # Student Endpoints - FIXED | |
| # ======================= | |
| async def get_conversations(current_user: dict = Depends(get_current_user)): | |
| """Get all conversations with first message for title generation""" | |
| if current_user['role'] != 'student': | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute(""" | |
| SELECT | |
| c.id, | |
| c.title, | |
| c.created_at, | |
| (SELECT content | |
| FROM messages m | |
| WHERE m.conversation_id = c.id | |
| AND (m.role = 'user' OR m.sender = 'user') | |
| ORDER BY m.created_at ASC | |
| LIMIT 1) as first_message | |
| FROM conversations c | |
| WHERE c.user_id = ? AND c.is_deleted = 0 | |
| ORDER BY c.created_at DESC | |
| """, (current_user['user_id'],)) | |
| conversations = [] | |
| for row in c.fetchall(): | |
| conv_dict = dict(row) | |
| conversations.append(conv_dict) | |
| conn.close() | |
| return {"conversations": conversations} | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error loading conversations: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error loading conversations: {str(e)}") | |
| async def get_conversation(conversation_id: int, current_user: dict = Depends(get_current_user)): | |
| """Get conversation with all messages - handles both old and new format""" | |
| if current_user['role'] != 'student': | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Verify conversation belongs to user | |
| c.execute("SELECT * FROM conversations WHERE id = ? AND user_id = ? AND is_deleted = 0", | |
| (conversation_id, current_user['user_id'])) | |
| conversation = c.fetchone() | |
| if not conversation: | |
| conn.close() | |
| raise HTTPException(status_code=404, detail="Conversation not found") | |
| # Get all messages with both sender and role | |
| c.execute(""" | |
| SELECT id, conversation_id, sender, role, content, created_at | |
| FROM messages | |
| WHERE conversation_id = ? | |
| ORDER BY created_at ASC | |
| """, (conversation_id,)) | |
| messages = [] | |
| for row in c.fetchall(): | |
| msg = dict(row) | |
| # Ensure role is set correctly (backward compatibility) | |
| if not msg.get('role') or msg['role'] == '': | |
| if msg['sender'] == 'ai': | |
| msg['role'] = 'assistant' | |
| else: | |
| msg['role'] = 'user' | |
| messages.append(msg) | |
| conn.close() | |
| print(f"β Loaded conversation {conversation_id} with {len(messages)} messages") | |
| return { | |
| "conversation": dict(conversation), | |
| "messages": messages | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error loading conversation: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error loading conversation: {str(e)}") | |
| async def delete_conversation( | |
| conversation_id: int, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Delete a conversation and its messages""" | |
| if current_user['role'] != 'student': | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Verify ownership | |
| c.execute("SELECT id FROM conversations WHERE id = ? AND user_id = ?", | |
| (conversation_id, current_user['user_id'])) | |
| conversation = c.fetchone() | |
| if not conversation: | |
| conn.close() | |
| raise HTTPException(status_code=404, detail="Conversation not found") | |
| # Soft delete conversation (Hide from user, keep for admin/feedback) | |
| c.execute("UPDATE conversations SET is_deleted = 1 WHERE id = ?", (conversation_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(f"ποΈ Deleted conversation {conversation_id}") | |
| return { | |
| "success": True, | |
| "message": "Conversation deleted successfully" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error deleting conversation: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def chat(data: ChatMessage, current_user: dict = Depends(get_current_user)): | |
| """Send message and get AI response - NOW SAVES BOTH sender AND role""" | |
| if current_user['role'] != 'student': | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Create new conversation if needed | |
| if data.conversation_id is None: | |
| # Generate title from first 3 words | |
| title_words = data.message.split()[:3] | |
| title = " ".join(title_words) | |
| if len(data.message.split()) > 3: | |
| title += "..." | |
| c.execute("INSERT INTO conversations (user_id, title) VALUES (?, ?)", | |
| (current_user['user_id'], title)) | |
| conversation_id = c.lastrowid | |
| print(f"β Created new conversation: {conversation_id} - '{title}'") | |
| else: | |
| conversation_id = data.conversation_id | |
| # π₯ CRITICAL: Save user message with BOTH sender AND role | |
| c.execute( | |
| "INSERT INTO messages (conversation_id, sender, role, content) VALUES (?, ?, ?, ?)", | |
| (conversation_id, 'user', 'user', data.message) | |
| ) | |
| user_message_id = c.lastrowid | |
| conn.commit() | |
| print(f"π¬ User message saved (ID: {user_message_id})") | |
| # 1. Pre-create AI message with empty content to get an ID | |
| c.execute( | |
| "INSERT INTO messages (conversation_id, sender, role, content) VALUES (?, ?, ?, ?)", | |
| (conversation_id, 'ai', 'assistant', '') | |
| ) | |
| ai_message_id = c.lastrowid | |
| conn.commit() | |
| conn.close() # Close main connection, we will open a new one for update | |
| # 2. Define Generator for Streaming | |
| async def response_generator(): | |
| full_response = "" | |
| RAG_URL = "http://127.0.0.1:8001/ask_stream" | |
| try: | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| async with client.stream("POST", RAG_URL, json={"question": data.message, "conversation_id": conversation_id}) as r: | |
| async for chunk in r.aiter_text(): | |
| full_response += chunk | |
| yield chunk | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| full_response += error_msg | |
| yield error_msg | |
| # 3. Update DB with full response after stream ends | |
| try: | |
| # Must create new connection in async generator | |
| update_conn = sqlite3.connect(DB_PATH) | |
| update_c = update_conn.cursor() | |
| update_c.execute("UPDATE messages SET content = ? WHERE id = ?", (full_response, ai_message_id)) | |
| update_conn.commit() | |
| update_conn.close() | |
| print(f"π€ AI message updated (ID: {ai_message_id})") | |
| except Exception as e: | |
| print(f"β Error updating DB: {e}") | |
| # 4. Return Streaming Response with IDs in headers | |
| return StreamingResponse( | |
| response_generator(), | |
| media_type="text/plain", | |
| headers={ | |
| "X-Conversation-Id": str(conversation_id), | |
| "X-Message-Id": str(ai_message_id) | |
| } | |
| ) | |
| except Exception as e: | |
| conn.rollback() | |
| print(f"β Error in chat endpoint: {e}") | |
| raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}") | |
| # ======================= | |
| # Feedback Endpoints | |
| # ======================= | |
| async def submit_feedback( | |
| feedback: FeedbackRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Submit or update feedback for a message""" | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Check if feedback exists | |
| c.execute( | |
| "SELECT id FROM feedbacks WHERE message_id = ? AND user_id = ?", | |
| (feedback.message_id, current_user['user_id']) | |
| ) | |
| existing = c.fetchone() | |
| if existing: | |
| # Update existing | |
| c.execute( | |
| """UPDATE feedbacks | |
| SET feedback_type = ?, comment = ?, created_at = CURRENT_TIMESTAMP | |
| WHERE id = ?""", | |
| (feedback.feedback_type, feedback.comment, existing['id']) | |
| ) | |
| print(f"β Updated feedback for message {feedback.message_id}") | |
| else: | |
| # Create new | |
| c.execute( | |
| """INSERT INTO feedbacks (message_id, user_id, feedback_type, comment) | |
| VALUES (?, ?, ?, ?)""", | |
| (feedback.message_id, current_user['user_id'], | |
| feedback.feedback_type, feedback.comment) | |
| ) | |
| print(f"β Created feedback for message {feedback.message_id}") | |
| conn.commit() | |
| conn.close() | |
| return { | |
| "success": True, | |
| "message": "Feedback submitted successfully" | |
| } | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error submitting feedback: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_all_feedbacks(current_user: dict = Depends(get_current_user)): | |
| """Get all feedback with details""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute(""" | |
| SELECT | |
| f.id, | |
| f.message_id, | |
| f.feedback_type, | |
| f.comment, | |
| f.created_at, | |
| m.content as message_content, | |
| m.sender as message_sender, | |
| m.role as message_role, | |
| u.email as user_email, | |
| c.title as conversation_title, | |
| c.id as conversation_id | |
| FROM feedbacks f | |
| JOIN messages m ON f.message_id = m.id | |
| JOIN users u ON f.user_id = u.id | |
| JOIN conversations c ON m.conversation_id = c.id | |
| ORDER BY f.created_at DESC | |
| """) | |
| feedbacks = [dict(row) for row in c.fetchall()] | |
| conn.close() | |
| return {"feedbacks": feedbacks} | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error fetching feedbacks: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ======================= | |
| # Admin Endpoints - Courses | |
| # ======================= | |
| async def create_course( | |
| course: CourseCreate, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Create a new course""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Check if exists | |
| c.execute("SELECT id FROM courses WHERE name = ?", (course.name,)) | |
| if c.fetchone(): | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="Course already exists") | |
| # Insert | |
| c.execute( | |
| "INSERT INTO courses (name, description, admin_id) VALUES (?, ?, ?)", | |
| (course.name, course.description, current_user['user_id']) | |
| ) | |
| course_id = c.lastrowid | |
| conn.commit() | |
| conn.close() | |
| print(f"β Course created: {course.name} (ID: {course_id})") | |
| return { | |
| "success": True, | |
| "message": "Course created successfully", | |
| "course_id": course_id, | |
| "name": course.name | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error creating course: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_courses(current_user: dict = Depends(get_current_user)): | |
| """Get all courses with lecture counts""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute(""" | |
| SELECT | |
| c.id, | |
| c.name, | |
| c.description, | |
| c.created_at, | |
| COUNT(l.id) as lecture_count | |
| FROM courses c | |
| LEFT JOIN lectures l ON c.name = l.subject | |
| GROUP BY c.id | |
| ORDER BY c.created_at DESC | |
| """) | |
| courses = [] | |
| for row in c.fetchall(): | |
| courses.append({ | |
| "id": row[0], | |
| "name": row[1], | |
| "description": row[2], | |
| "created_at": row[3], | |
| "lecture_count": row[4] | |
| }) | |
| conn.close() | |
| return {"courses": courses} | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error fetching courses: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_course_lectures( | |
| course_name: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Get lectures for a course""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute(""" | |
| SELECT id, filename, filepath, subject, uploaded_at, | |
| processing_status, total_chunks, total_characters | |
| FROM lectures | |
| WHERE subject = ? | |
| ORDER BY uploaded_at DESC | |
| """, (course_name,)) | |
| lectures = [dict(row) for row in c.fetchall()] | |
| conn.close() | |
| return { | |
| "course_name": course_name, | |
| "lectures": lectures | |
| } | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error fetching course lectures: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_course( | |
| course_id: int, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Delete a course""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute("SELECT name FROM courses WHERE id = ?", (course_id,)) | |
| course = c.fetchone() | |
| if not course: | |
| conn.close() | |
| raise HTTPException(status_code=404, detail="Course not found") | |
| course_name = course[0] | |
| c.execute("DELETE FROM courses WHERE id = ?", (course_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(f"ποΈ Course deleted: {course_name}") | |
| return { | |
| "success": True, | |
| "message": f"Course '{course_name}' deleted", | |
| "course_id": course_id | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error deleting course: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ======================= | |
| # Admin Endpoints - Lectures & Stats | |
| # ======================= | |
| async def get_users(current_user: dict = Depends(get_current_user)): | |
| """Get all users""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT id, email, role, created_at FROM users ORDER BY created_at DESC") | |
| users = [dict(row) for row in c.fetchall()] | |
| conn.close() | |
| return {"users": users} | |
| async def get_lectures(current_user: dict = Depends(get_current_user)): | |
| """Get all lectures""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| c.execute(""" | |
| SELECT id, filename, subject, uploaded_at, | |
| processing_status, total_chunks, total_characters, error_message | |
| FROM lectures | |
| ORDER BY uploaded_at DESC | |
| """) | |
| lectures = [dict(row) for row in c.fetchall()] | |
| conn.close() | |
| return {"lectures": lectures} | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error in get_lectures: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_lecture( | |
| file: UploadFile = File(...), | |
| subject: str = Form(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Upload and process lecture""" | |
| print(f"\n{'='*60}") | |
| print(f"π€ Upload Request") | |
| print(f" File: {file.filename}") | |
| print(f" Course: {subject}") | |
| print(f" User: {current_user['user_id']}") | |
| print(f"{'='*60}\n") | |
| if current_user['role'] != 'admin': | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| file_ext = os.path.splitext(file.filename)[1].lower() | |
| if file_ext != '.pdf': | |
| raise HTTPException(status_code=400, detail="Only PDF files allowed") | |
| if not subject or subject.strip() == "": | |
| raise HTTPException(status_code=400, detail="Course name required") | |
| unique_filename = f"{uuid.uuid4()}{file_ext}" | |
| filepath = os.path.join(LECTURES_DIR, unique_filename) | |
| try: | |
| with open(filepath, "wb") as f: | |
| content = await file.read() | |
| f.write(content) | |
| print(f"β File saved: {filepath}") | |
| except Exception as e: | |
| print(f"β Failed to save: {e}") | |
| raise HTTPException(status_code=500, detail=f"Save error: {str(e)}") | |
| lecture_id = None | |
| try: | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute( | |
| """INSERT INTO lectures | |
| (admin_id, filename, filepath, subject, uploaded_at, processing_status) | |
| VALUES (?, ?, ?, ?, datetime('now'), ?)""", | |
| (current_user['user_id'], file.filename, filepath, subject.strip(), 'processing') | |
| ) | |
| lecture_id = c.lastrowid | |
| conn.commit() | |
| conn.close() | |
| print(f"β Lecture saved to DB: {lecture_id}") | |
| # Process PDF (uncomment when ready) | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| executor, | |
| process_new_pdf, | |
| filepath, | |
| subject.strip() | |
| ) | |
| if not result['success']: | |
| raise Exception(result.get('error', 'Processing failed')) | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute( | |
| """UPDATE lectures | |
| SET processing_status = 'completed', | |
| total_chunks = ?, | |
| total_characters = ? | |
| WHERE id = ?""", | |
| (result['total_chunks'], result['total_characters'], lecture_id) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| print(f"β Processing completed") | |
| return { | |
| "success": True, | |
| "message": "Lecture uploaded successfully", | |
| "lecture_id": lecture_id, | |
| "filename": file.filename, | |
| "subject": subject, | |
| "status": "completed", | |
| "stats": { | |
| "total_chunks": result['total_chunks'], | |
| "total_characters": result['total_characters'] | |
| } | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"β Error: {error_msg}") | |
| if lecture_id: | |
| try: | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute( | |
| """UPDATE lectures | |
| SET processing_status = 'failed', error_message = ? | |
| WHERE id = ?""", | |
| (error_msg, lecture_id) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception as db_error: | |
| print(f"β οΈ Failed to update error: {db_error}") | |
| if os.path.exists(filepath): | |
| try: | |
| os.remove(filepath) | |
| except: | |
| pass | |
| raise HTTPException(status_code=500, detail=f"Processing error: {error_msg}") | |
| async def delete_lecture( | |
| lecture_id: int, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Delete a lecture""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT filepath FROM lectures WHERE id = ?", (lecture_id,)) | |
| lecture = c.fetchone() | |
| if not lecture: | |
| conn.close() | |
| raise HTTPException(status_code=404, detail="Lecture not found") | |
| filepath = lecture[0] | |
| c.execute("DELETE FROM lectures WHERE id = ?", (lecture_id,)) | |
| conn.commit() | |
| conn.close() | |
| if os.path.exists(filepath): | |
| try: | |
| os.remove(filepath) | |
| print(f"ποΈ Deleted file: {filepath}") | |
| except Exception as e: | |
| print(f"β οΈ Could not delete file: {e}") | |
| return { | |
| "success": True, | |
| "message": "Lecture deleted", | |
| "lecture_id": lecture_id | |
| } | |
| async def get_stats(current_user: dict = Depends(get_current_user)): | |
| """Get comprehensive statistics""" | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| conn = get_db() | |
| c = conn.cursor() | |
| try: | |
| # Users | |
| c.execute("SELECT COUNT(*) FROM users WHERE role = 'student'") | |
| total_students = c.fetchone()[0] | |
| # Lectures | |
| c.execute("SELECT COUNT(*) FROM lectures") | |
| total_lectures = c.fetchone()[0] | |
| c.execute("SELECT COUNT(*) FROM lectures WHERE processing_status = 'completed'") | |
| completed_lectures = c.fetchone()[0] | |
| c.execute("SELECT COUNT(*) FROM lectures WHERE processing_status = 'failed'") | |
| failed_lectures = c.fetchone()[0] | |
| # Courses | |
| c.execute("SELECT COUNT(*) FROM courses") | |
| total_courses = c.fetchone()[0] | |
| # Activity | |
| c.execute("SELECT COUNT(*) FROM conversations WHERE is_deleted = 0") | |
| total_conversations = c.fetchone()[0] | |
| c.execute("SELECT COUNT(*) FROM messages") | |
| total_messages = c.fetchone()[0] | |
| # Feedback | |
| c.execute("SELECT COUNT(*) FROM feedbacks WHERE feedback_type = 'positive'") | |
| positive_feedbacks = c.fetchone()[0] | |
| c.execute("SELECT COUNT(*) FROM feedbacks WHERE feedback_type = 'negative'") | |
| negative_feedbacks = c.fetchone()[0] | |
| # Content stats | |
| c.execute("SELECT SUM(total_chunks) FROM lectures WHERE processing_status = 'completed'") | |
| result = c.fetchone()[0] | |
| total_chunks = result if result else 0 | |
| c.execute("SELECT SUM(total_characters) FROM lectures WHERE processing_status = 'completed'") | |
| result = c.fetchone()[0] | |
| total_characters = result if result else 0 | |
| conn.close() | |
| return { | |
| "stats": { | |
| "users": { | |
| "total_students": total_students | |
| }, | |
| "courses": { | |
| "total": total_courses | |
| }, | |
| "lectures": { | |
| "total": total_lectures, | |
| "completed": completed_lectures, | |
| "failed": failed_lectures, | |
| "processing": total_lectures - completed_lectures - failed_lectures | |
| }, | |
| "content": { | |
| "total_chunks": total_chunks, | |
| "total_characters": total_characters | |
| }, | |
| "activity": { | |
| "total_conversations": total_conversations, | |
| "total_messages": total_messages | |
| }, | |
| "feedback": { | |
| "positive": positive_feedbacks, | |
| "negative": negative_feedbacks, | |
| "total": positive_feedbacks + negative_feedbacks | |
| } | |
| } | |
| } | |
| except Exception as e: | |
| conn.close() | |
| print(f"β Error in get_stats: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ======================= | |
| # User Info | |
| # ======================= | |
| def get_me(current_user: dict = Depends(get_current_user)): | |
| """Get current user info""" | |
| conn = get_db() | |
| c = conn.cursor() | |
| c.execute("SELECT email FROM users WHERE id = ?", (current_user['user_id'],)) | |
| user = c.fetchone() | |
| conn.close() | |
| if user: | |
| current_user['email'] = user['email'] | |
| return current_user | |
| # ======================= | |
| # Health Check | |
| # ======================= | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "database": "connected", | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "version": "3.1.0" | |
| } | |
| # ======================= | |
| # Run Server | |
| # ======================= | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("\n" + "="*60) | |
| print("π Starting University AI Chatbot API") | |
| print("="*60) | |
| print(f"π API URL: http://localhost:8080") | |
| print(f"π Docs: http://localhost:8080/docs") | |
| print(f"π€ Admin: admin@university.edu / Admin123") | |
| print("="*60 + "\n") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |