PortfolioMind / src /core /langgraph_supervisor.py
vaha-m's picture
Update src/core/langgraph_supervisor.py
b660e0d verified
from typing import Any, Dict, List, Optional, TypedDict, Annotated, Sequence, AsyncGenerator
import operator
import asyncio
from asyncio import Queue
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from src.core.config import config
class AgentState(TypedDict):
"""State for the ReAct agent pattern."""
messages: Annotated[Sequence[BaseMessage], operator.add]
query: str
agent_outputs: Dict[str, Any]
reasoning_steps: List[str]
final_answer: Optional[str]
current_step: int
max_steps: int
class ReActSupervisor:
"""Supervisor using ReAct pattern for multi-agent orchestration with streaming."""
def __init__(
self,
crypto_agent=None,
rag_agent=None,
stock_agent=None,
search_agent=None,
finance_tracker=None,
max_steps: int = 5
):
"""
Initialize the ReAct supervisor.
Args:
crypto_agent: Crypto agent instance
rag_agent: RAG agent instance
stock_agent: Stock agent instance
search_agent: Web search agent instance (DuckDuckGo)
finance_tracker: Finance tracker agent instance
max_steps: Maximum reasoning steps before forcing completion
"""
self.crypto_agent = crypto_agent
self.rag_agent = rag_agent
self.stock_agent = stock_agent
self.search_agent = search_agent
self.finance_tracker = finance_tracker
self.max_steps = max_steps
self.streaming_callback = None # For streaming updates
# Initialize supervisor LLM with structured output
self.supervisor_llm = ChatGoogleGenerativeAI(
model="gemini-2.5-pro",
temperature=0.1,
google_api_key=config.GOOGLE_API_KEY,
)
# Build the ReAct workflow
self.graph = self._build_react_graph()
self.compiled_graph = self.graph.compile()
def _build_react_graph(self) -> StateGraph:
"""Build the ReAct pattern graph."""
# Create the state graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("think", self.think_node)
workflow.add_node("act_crypto", self.act_crypto_node)
workflow.add_node("act_rag", self.act_rag_node)
workflow.add_node("act_stock", self.act_stock_node)
workflow.add_node("act_search", self.act_search_node)
workflow.add_node("act_finance_tracker", self.act_finance_tracker_node)
workflow.add_node("observe", self.observe_node)
workflow.add_node("finish", self.finish_node)
# Set entry point
workflow.set_entry_point("think")
# Add routing from think node
workflow.add_conditional_edges(
"think",
self.route_from_thinking,
{
"crypto": "act_crypto",
"rag": "act_rag",
"stock": "act_stock",
"search": "act_search",
"finance_tracker": "act_finance_tracker",
"finish": "finish",
}
)
# Actions lead to observe
workflow.add_edge("act_crypto", "observe")
workflow.add_edge("act_rag", "observe")
workflow.add_edge("act_stock", "observe")
workflow.add_edge("act_search", "observe")
workflow.add_edge("act_finance_tracker", "observe")
# Observe leads back to think (or finish if max steps)
workflow.add_conditional_edges(
"observe",
self.should_continue,
{
"continue": "think",
"finish": "finish"
}
)
# Finish ends the graph
workflow.add_edge("finish", END)
return workflow
async def _emit_update(self, update: Dict[str, Any]):
"""Emit streaming update if callback is set."""
if self.streaming_callback:
await self.streaming_callback(update)
async def think_node(self, state: AgentState) -> Dict[str, Any]:
"""Reasoning step: Analyze current state and decide next action."""
current_step = state.get("current_step", 0) + 1
# Build context from previous outputs
context = self._build_context(state)
# Create reasoning prompt
think_prompt = f"""You are a ReAct supervisor orchestrating multiple agents to answer user queries.
Current Query: {state['query']}
Available Actions:
- CALL_CRYPTO: Get cryptocurrency market data, prices, trends
- CALL_STOCK: Get stock market data, company information, financial data
- CALL_FINANCE_TRACKER: Manage personal stock portfolio (add transactions, view positions, analyze performance, get portfolio news)
- CALL_RAG: Search and retrieve information from uploaded documents
- CALL_SEARCH: Search the web for current information, news, or general knowledge
- FINISH: Provide final answer (use when you have sufficient information)
Current Step: {current_step}/{self.max_steps}
Information Gathered So Far:
{context}
IMPORTANT INSTRUCTIONS:
1. Check what information you ALREADY HAVE in the context above
2. Do NOT call the same agent twice unless you need different information
3. If you already have an answer from any agent, move to FINISH
4. Only call another agent if you need ADDITIONAL different information
5. Use CALL_SEARCH for general knowledge, current events, and news
6. FINISH when you have enough information to answer the user's query
Based on what you know so far, reason about what to do next.
Format your response as:
THOUGHT: [Analyze what you have and what you still need]
ACTION: [CALL_CRYPTO | CALL_STOCK | CALL_RAG | CALL_SEARCH | FINISH]
JUSTIFICATION: [Why this action will help]"""
response = await self.supervisor_llm.ainvoke([
SystemMessage(content="You are a ReAct supervisor. Think step by step and avoid redundant actions."),
HumanMessage(content=think_prompt)
])
# Parse the response
content = response.content
thought = ""
action = "finish"
justification = ""
if "THOUGHT:" in content:
thought = content.split("THOUGHT:")[1].split("ACTION:")[0].strip()
if "ACTION:" in content:
action_text = content.split("ACTION:")[1].split("\n")[0].strip().upper()
if "CRYPTO" in action_text:
action = "crypto"
elif "FINANCE_TRACKER" in action_text or "FINANCE" in action_text:
action = "finance_tracker"
elif "STOCK" in action_text:
action = "stock"
elif "RAG" in action_text:
action = "rag"
elif "SEARCH" in action_text:
action = "search"
else:
action = "finish"
if "JUSTIFICATION:" in content:
justification = content.split("JUSTIFICATION:")[1].strip()
# Add reasoning step
reasoning_steps = state.get("reasoning_steps", [])
reasoning_steps.append(f"Step {current_step}: {thought} -> Action: {action}")
print(f"\nThinking (Step {current_step}):")
print(f" Thought: {thought}")
print(f" Action: {action}")
print(f" Justification: {justification}")
# Emit streaming update
await self._emit_update({
"type": "thinking",
"step": current_step,
"thought": thought,
"action": action,
"justification": justification
})
return {
"current_step": current_step,
"reasoning_steps": reasoning_steps,
"messages": [AIMessage(content=f"Thought: {thought}\nAction: {action}")],
"next_action": action
}
async def act_crypto_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute crypto agent and return raw output."""
if not self.crypto_agent:
return {"agent_outputs": {"crypto_error": "Crypto agent not available"}}
print(" Calling Crypto Agent...")
await self._emit_update({"type": "action", "agent": "crypto"})
result = await self.crypto_agent.process(
state["query"],
history=self._extract_history(state["messages"])
)
agent_outputs = state.get("agent_outputs", {})
agent_outputs["crypto"] = result
return {"agent_outputs": agent_outputs}
async def act_rag_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute RAG agent and return raw output."""
if not self.rag_agent:
return {"agent_outputs": {"rag_error": "RAG agent not available"}}
print(" Calling RAG Agent...")
await self._emit_update({"type": "action", "agent": "rag"})
result = await self.rag_agent.process(
state["query"],
history=self._extract_history(state["messages"])
)
agent_outputs = state.get("agent_outputs", {})
agent_outputs["rag"] = result
return {"agent_outputs": agent_outputs}
async def act_stock_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute stock agent and return raw output."""
if not self.stock_agent:
return {"agent_outputs": {"stock_error": "Stock agent not available"}}
print(" Calling Stock Agent...")
await self._emit_update({"type": "action", "agent": "stock"})
result = await self.stock_agent.process(
state["query"],
history=self._extract_history(state["messages"])
)
agent_outputs = state.get("agent_outputs", {})
agent_outputs["stock"] = result
return {"agent_outputs": agent_outputs}
async def act_search_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute web search agent and return raw output."""
if not self.search_agent:
return {"agent_outputs": {"search_error": "Search agent not available"}}
print(" Calling Web Search Agent...")
await self._emit_update({"type": "action", "agent": "search"})
result = await self.search_agent.process(
state["query"],
history=self._extract_history(state["messages"])
)
agent_outputs = state.get("agent_outputs", {})
agent_outputs["search"] = result
return {"agent_outputs": agent_outputs}
async def act_finance_tracker_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute finance tracker agent and return raw output."""
if not self.finance_tracker:
return {"agent_outputs": {"finance_tracker_error": "Finance Tracker agent not available"}}
print(" Calling Finance Tracker Agent...")
await self._emit_update({"type": "action", "agent": "finance_tracker"})
result = await self.finance_tracker.process(
state["query"],
history=self._extract_history(state["messages"])
)
agent_outputs = state.get("agent_outputs", {})
agent_outputs["finance_tracker"] = result
return {"agent_outputs": agent_outputs}
async def observe_node(self, state: AgentState) -> Dict[str, Any]:
"""Process and observe the latest agent output."""
agent_outputs = state.get("agent_outputs", {})
latest_output = "No new observations"
latest_agent = "unknown"
search_urls = None
tool_calls = None
if agent_outputs:
for agent_name, output in list(agent_outputs.items())[-1:]:
if isinstance(output, dict) and output.get("success"):
response = output.get('response', 'No response')
latest_output = response
latest_agent = agent_name
# Extract search URLs if available (from search agent)
if "search_urls" in output:
search_urls = output["search_urls"]
# Extract tool calls if available (from MCP agents)
if "tool_calls" in output:
tool_calls = output["tool_calls"]
break
# Prepend tool calls to the summary if available
summary = latest_output
if tool_calls:
tool_calls_text = "\n".join(tool_calls)
summary = f"{tool_calls_text}\n Observation from {latest_agent}: {latest_output}"
# Apply length limit
summary = summary[:2000] + "..." if len(summary) > 2000 else summary
print(f" Observation from {latest_agent}: {summary}")
# Emit streaming update with search URLs if available
update_data = {
"type": "observation",
"agent": latest_agent,
"summary": summary
}
if search_urls:
update_data["search_urls"] = search_urls
await self._emit_update(update_data)
return {
"messages": [AIMessage(content=f"Observation from {latest_agent}:\n{latest_output}")]
}
async def finish_node(self, state: AgentState) -> Dict[str, Any]:
"""Synthesize all agent outputs and generate final answer with streaming."""
agent_outputs = state.get("agent_outputs", {})
reasoning_steps = state.get("reasoning_steps", [])
print("\nSupervisor Synthesizing Final Answer...")
# Build synthesis prompt with conversation history
synthesis_prompt = f"""You are synthesizing information to answer this query: {state['query']}
"""
# Include conversation history for context
messages = state.get("messages", [])
if messages:
synthesis_prompt += "CONVERSATION HISTORY:\n"
for msg in messages:
if isinstance(msg, HumanMessage):
synthesis_prompt += f"User: {msg.content}\n"
elif isinstance(msg, AIMessage):
synthesis_prompt += f"Assistant: {msg.content}\n"
synthesis_prompt += "\n"
synthesis_prompt += f"""Your reasoning process:
{chr(10).join(reasoning_steps)}
Information gathered from agents:"""
for agent_name, output in agent_outputs.items():
if isinstance(output, dict) and output.get("success"):
synthesis_prompt += f"\n\n{agent_name.upper()} Agent Response:\n{output.get('response', 'No response')}"
synthesis_prompt += """
Now provide a comprehensive, well-structured answer that:
1. Directly addresses the user's query (considering the conversation history if present)
2. Integrates insights from all relevant agent outputs
3. Is clear and actionable
4. Highlights any important findings or recommendations
5. Cites sources when appropriate
Final Answer:"""
# Emit start of final answer
await self._emit_update({
"type": "final_start"
})
# Stream the final answer token by token
# Stream the final answer token by token
final_answer = ""
async for chunk in self.supervisor_llm.astream([
SystemMessage(content="You are providing the final, synthesized answer."),
HumanMessage(content=synthesis_prompt)
]):
if hasattr(chunk, 'content') and chunk.content:
# Clean unicode artifacts from Gemini streaming
clean_content = chunk.content.replace('∗', '*')
final_answer += clean_content
# Emit each token/chunk as it arrives
await self._emit_update({
"type": "final_token",
"token": clean_content,
"accumulated": final_answer
})
# Emit completion of final answer
await self._emit_update({
"type": "final_complete",
"response": final_answer
})
return {
"final_answer": final_answer,
"messages": [AIMessage(content=final_answer)]
}
def route_from_thinking(self, state: AgentState) -> str:
"""Route based on thinking decision."""
last_message = state["messages"][-1] if state["messages"] else None
if last_message and "Action:" in last_message.content:
try:
action_line = last_message.content.split("Action:")[1].split("\n")[0].strip().upper()
if "CRYPTO" in action_line or "CALL_CRYPTO" in action_line:
return "crypto"
elif "FINANCE_TRACKER" in action_line or "CALL_FINANCE_TRACKER" in action_line or "FINANCE" in action_line:
return "finance_tracker"
elif "STOCK" in action_line or "CALL_STOCK" in action_line:
return "stock"
elif "RAG" in action_line or "CALL_RAG" in action_line:
return "rag"
elif "SEARCH" in action_line or "CALL_SEARCH" in action_line:
return "search"
elif "FINISH" in action_line:
return "finish"
except (IndexError, AttributeError):
pass
return "finish"
def should_continue(self, state: AgentState) -> str:
"""Decide whether to continue reasoning or finish."""
current_step = state.get("current_step", 0)
if current_step >= self.max_steps:
print(f" Max steps ({self.max_steps}) reached, finishing...")
return "finish"
return "continue"
def _build_context(self, state: AgentState) -> str:
"""Build context string from current state, including conversation history."""
context_parts = []
# Include conversation history for context
messages = state.get("messages", [])
if messages:
history_text = "=== CONVERSATION HISTORY ===\n"
for msg in messages:
if isinstance(msg, HumanMessage):
history_text += f"User: {msg.content}\n"
elif isinstance(msg, AIMessage):
history_text += f"Assistant: {msg.content}\n"
context_parts.append(history_text)
# Include agent outputs from current query
agent_outputs = state.get("agent_outputs", {})
if agent_outputs:
for agent_name, output in agent_outputs.items():
if isinstance(output, dict) and output.get("success"):
response = output.get("response", "No response")
# Increased limit to provide more context to LLM
if len(response) > 5000:
response = response[:5000] + f"... [Response continues for {len(response)} total chars]"
context_parts.append(f"=== {agent_name.upper()} Agent ===\n{response}")
return "\n\n".join(context_parts) if context_parts else "No information gathered yet."
def _extract_history(self, messages: Sequence[BaseMessage]) -> List[Dict[str, str]]:
"""Extract chat history from messages."""
history = []
for msg in messages:
if isinstance(msg, HumanMessage):
history.append({"user": msg.content})
elif isinstance(msg, AIMessage):
history.append({"assistant": msg.content})
return history[-10:]
async def process(self, query: str, history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""Process query through ReAct supervisor pattern."""
initial_state: AgentState = {
"messages": [],
"query": query,
"agent_outputs": {},
"reasoning_steps": [],
"final_answer": None,
"current_step": 0,
"max_steps": self.max_steps
}
if history:
for turn in history[-3:]:
if "user" in turn:
initial_state["messages"].append(HumanMessage(content=turn["user"]))
if "assistant" in turn:
initial_state["messages"].append(AIMessage(content=turn["assistant"]))
try:
print(f"\nReAct Supervisor starting for query: '{query}'")
print(f" Max steps: {self.max_steps}")
final_state = await self.compiled_graph.ainvoke(initial_state)
return {
"success": True,
"response": final_state.get("final_answer", "No answer generated"),
"reasoning_steps": final_state.get("reasoning_steps", []),
"agent_outputs": final_state.get("agent_outputs", {}),
"steps_taken": final_state.get("current_step", 0)
}
except Exception as e:
print(f" ReAct Supervisor error: {e}")
return {
"success": False,
"error": str(e),
"response": f"Supervisor error: {str(e)}"
}
async def process_streaming(
self,
query: str,
history: Optional[List[Dict[str, str]]] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Process query with true async streaming using Queue.
Yields update dictionaries with types: thinking, action, observation, final, error
"""
updates_queue = Queue()
async def callback(update: Dict[str, Any]):
"""Non-blocking callback to queue updates."""
await updates_queue.put(update)
# Set streaming callback
self.streaming_callback = callback
# Start processing in background
result_task = asyncio.create_task(self.process(query, history))
# Stream updates efficiently without polling
try:
while not result_task.done():
try:
# Non-blocking wait with short timeout
update = await asyncio.wait_for(
updates_queue.get(),
timeout=0.01 # Very short timeout for responsiveness
)
yield update
except asyncio.TimeoutError:
# No update available yet, continue loop
continue
except Exception as e:
print(f"Warning: Error getting update from queue: {e}")
continue
# Drain any remaining updates from the queue
while not updates_queue.empty():
try:
update = await updates_queue.get()
yield update
except Exception as e:
print(f"Warning: Error draining queue: {e}")
break
# Handle final result
result = await result_task
if not result.get("success"):
yield {
"type": "error",
"error": result.get("error", "Unknown error")
}
except Exception as e:
yield {
"type": "error",
"error": str(e)
}
finally:
self.streaming_callback = None
async def query_multi_agent_system_async(message: str) -> str:
"""
MCP tool wrapper for ElevenLabs integration.
Wraps the ReAct supervisor to return a clean string response.
Args:
message (str): User's question or request
Returns:
str: Final answer from the multi-agent system
"""
from src.agents.crypto_agent_mcp import get_crypto_agent
from src.agents.stock_agent_mcp import get_stock_agent
from src.agents.finance_tracker_agent_mcp import get_finance_tracker_agent
from src.agents.rag_agent_mcp import get_rag_agent
from src.agents.search_agent_mcp import get_search_agent
# Create supervisor instance with all agents
supervisor = ReActSupervisor(
crypto_agent=get_crypto_agent(config),
rag_agent=get_rag_agent(config),
stock_agent=get_stock_agent(config),
search_agent=get_search_agent(config),
finance_tracker=get_finance_tracker_agent(config),
max_steps=5
)
# Process the query
result = await supervisor.process(query=message, history=None)
# Return just the final answer
return result.get("response", "No answer generated")
def query_multi_agent_system(message: str) -> str:
"""
Synchronous MCP tool wrapper for ElevenLabs integration.
This is the function that will be exposed as an MCP tool.
Args:
message (str): User's question or request
Returns:
str: Final answer from the multi-agent system
"""
import asyncio
# Run the async function in event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(query_multi_agent_system_async(message))