import json import logging import os import tempfile import torch from audio_transcription import perform_forced_alignment from media_transcription_processor import MediaTranscriptionProcessor from transcription_status import transcription_status from omnilingual_asr.models.wav2vec2_llama.lang_ids import supported_langs from env_vars import API_LOG_LEVEL, MODEL_NAME from flask import Blueprint, jsonify, request, send_file from video_utils import check_ffmpeg_available, combine_video_with_subtitles transcriptions_blueprint = Blueprint( "transcriptions_blueprint", __name__, ) logger = logging.getLogger(__name__) logger.level = API_LOG_LEVEL logging.getLogger("boto3").setLevel(API_LOG_LEVEL) logging.getLogger("botocore").setLevel(API_LOG_LEVEL) MAX_SHORTFORM_DURATION = 10 # seconds @transcriptions_blueprint.route("/health") def health(): """Comprehensive health check endpoint""" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") cuda_available = torch.cuda.is_available() ffmpeg_available = check_ffmpeg_available() # Get transcription status transcription_info = MediaTranscriptionProcessor.get_server_status() # Get GPU details if CUDA is available gpu_info = {} if cuda_available: gpu_info = { "gpu_count": torch.cuda.device_count(), "current_device": torch.cuda.current_device(), "gpu_name": ( torch.cuda.get_device_name(0) if torch.cuda.device_count() > 0 else "Unknown" ), } # Add GPU memory information try: current_device = torch.cuda.current_device() memory_allocated = torch.cuda.memory_allocated(current_device) memory_reserved = torch.cuda.memory_reserved(current_device) memory_total = torch.cuda.get_device_properties(current_device).total_memory gpu_info.update( { "gpu_memory_allocated_mb": round(memory_allocated / 1024 / 1024, 1), "gpu_memory_reserved_mb": round(memory_reserved / 1024 / 1024, 1), "gpu_memory_total_mb": round(memory_total / 1024 / 1024, 1), "gpu_memory_free_mb": round( (memory_total - memory_reserved) / 1024 / 1024, 1 ), } ) except Exception as e: logger.warning(f"Could not get GPU memory info: {e}") return { "status": "healthy", "message": "MMS Transcription API is running", "version": "1.0.0", "service": "mms-transcription", "device": str(device), "cuda_available": cuda_available, "ffmpeg_available": ffmpeg_available, "transcription_status": transcription_info, **gpu_info, } @transcriptions_blueprint.route("/supported-languages") def get_supported_languages(): """Get list of supported languages for transcription""" try: return jsonify({ "supported_languages": supported_langs, }) except Exception as e: logger.error(f"Error getting supported languages: {str(e)}") return jsonify({ "error": "Could not retrieve supported languages", "message": str(e) }), 500 @transcriptions_blueprint.route("/status") def get_transcription_status(): """Get current transcription status""" return jsonify(MediaTranscriptionProcessor.get_server_status()) @transcriptions_blueprint.route("/transcribe", methods=["POST"]) def transcribe_audio(): """Transcribe media using the MMS model with intelligent chunking for all audio/video files""" try: # Check if server is busy if MediaTranscriptionProcessor.is_server_busy(): status = MediaTranscriptionProcessor.get_server_status() return ( jsonify( { "error": "Server is currently processing another transcription", "status": "busy", "current_operation": status.get("current_operation"), } ), 503, ) # Check if media file is provided if "media" not in request.files: return jsonify({"error": "No media file provided"}), 400 media_file = request.files["media"] if media_file.filename == "": return jsonify({"error": "No file selected"}), 400 # Get optional language parameter language_with_script = request.form.get("language", None) if language_with_script: logger.info(f"Language specified: {language_with_script}") else: logger.info("No language specified, using auto-detection") # Get optional include_preprocessed parameter (from form data or query string) include_preprocessed = ( request.form.get("include_preprocessed", "false").lower() == "true" or request.args.get("include_preprocessed", "false").lower() == "true" ) if include_preprocessed: logger.info("Preprocessed audio will be included in response") # Mark as busy and start transcription # This will be handled by the processor # Read file bytes once media_bytes = media_file.read() try: # Use the MediaTranscriptionProcessor with context manager for automatic cleanup with MediaTranscriptionProcessor(media_bytes, media_file.filename, language_with_script) as processor: # Start transcription status tracking processor.start_transcription() # Stage 1: Convert media (this also calculates duration and updates progress) processor.convert_media() logger.info(f"Media conversion completed for: {media_file.filename}") # Stage 2: Run full transcription pipeline (this also updates progress) processor.transcribe_full_pipeline() # Get final results with optional preprocessed audio results = processor.get_results(include_preprocessed_audio=include_preprocessed) logger.info(f"Transcription completed: {results.get('num_chunks', 0)} chunks") # Format response response = { "transcription": results.get("transcription", ""), "aligned_segments": results.get("aligned_segments", []), "chunks": results.get("chunks", []), "total_duration": results.get("total_duration", 0.0), "num_chunks": results.get("num_chunks", 0), "num_segments": results.get("num_segments", 0), "model": MODEL_NAME, "device": str(torch.device("cuda:0" if torch.cuda.is_available() else "cpu")), "status": results.get("status", "success"), } # Add preprocessed audio if it was included in results if "preprocessed_audio" in results: response["preprocessed_audio"] = results["preprocessed_audio"] if "error" in results: response["error"] = results["error"] logger.error(f"Transcription response with error: {response}") return jsonify(response), 500 # Print out the complete response for debugging logger.info("=== TRANSCRIBE RESPONSE ===") # logger.info(f"Full response: {json.dumps(response, indent=2)}") logger.info("=== END TRANSCRIBE RESPONSE ===") return jsonify(response) # Context manager automatically handles cleanup and status finalization here except Exception as e: logger.error(f"Media conversion/transcription error: {str(e)}") return jsonify({"error": f"Media processing failed: {str(e)}"}), 500 except Exception as e: logger.error(f"Transcription error: {str(e)}") return jsonify({"error": f"Transcription failed: {str(e)}"}), 500 @transcriptions_blueprint.route("/combine-video-subtitles", methods=["POST"]) def combine_video_subtitles(): """Combine video with subtitles using FFmpeg""" try: # Check if server is busy if MediaTranscriptionProcessor.is_server_busy(): status = MediaTranscriptionProcessor.get_server_status() return ( jsonify( { "error": "Server is currently processing another request", "status": "busy", "current_operation": status.get("current_operation"), } ), 503, ) # Check required fields if "video" not in request.files: return jsonify({"error": "No video file provided"}), 400 if "subtitles" not in request.form: return jsonify({"error": "No subtitles provided"}), 400 video_file = request.files["video"] subtitles = request.form["subtitles"] if video_file.filename == "": return jsonify({"error": "No video file selected"}), 400 # Get optional parameters subtitle_format = request.form.get("format", "srt") # srt or webvtt output_format = request.form.get("output_format", "mp4") # mp4 or mkv language = request.form.get("language", "eng") # Mark as busy and start processing transcription_status.start_transcription("combine_video", video_file.filename) try: transcription_status.update_progress(0.1) # Save the uploaded video file to a temporary location with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_file.filename)[1]) as temp_video: video_file.save(temp_video.name) temp_video_path = temp_video.name transcription_status.update_progress(0.3) try: # Combine video with subtitles using video_utils function output_path = combine_video_with_subtitles( temp_video_path, subtitles, subtitle_format, output_format, language ) transcription_status.update_progress(0.9) logger.info(f"Video combination completed: {output_path}") # Return the combined video file return send_file( output_path, as_attachment=True, download_name=f"{video_file.filename.rsplit('.', 1)[0]}_with_subtitles.{output_format}", mimetype=f"video/{output_format}", ) finally: # Clean up temporary video file try: os.unlink(temp_video_path) except OSError: pass finally: # Mark transcription as finished transcription_status.finish_transcription() except Exception as e: transcription_status.finish_transcription() logger.error(f"Video combination error: {str(e)}") return jsonify({"error": f"Video combination failed: {str(e)}"}), 500