import os
import sys
import json
import subprocess
import hashlib
import re
import threading
import datetime # <-- NEW
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Generator, Optional, Callable
from dotenv import load_dotenv
import ollama
from cerebras.cloud.sdk import Cerebras
from ddgs import DDGS
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
load_dotenv()
# ==========================================
# 1. Configuration & Constants
# ==========================================
LOCAL_LLM = "qwen3-vl:8b"
LOCAL_EMBED_MODEL = "nomic-embed-text-v2-moe:latest"
PKM_DIR = os.path.expanduser("~/monorepo")
XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
APP_CONFIG_DIR = os.path.join(XDG_CONFIG_HOME, "cerebral")
APP_CACHE_DIR = os.path.expanduser("~/.cache/cerebral")
ORG_OUTPUT_DIR = os.path.expanduser("~/org/cerebral")
os.makedirs(APP_CONFIG_DIR, exist_ok=True)
os.makedirs(APP_CACHE_DIR, exist_ok=True)
os.makedirs(ORG_OUTPUT_DIR, exist_ok=True)
MEMORY_FILE = os.path.join(APP_CACHE_DIR, "memory_summary.txt")
MEMORY_INDEX_PATH = os.path.join(APP_CACHE_DIR, "memory_index")
FAISS_INDEX_PATH = os.path.join(APP_CONFIG_DIR, "pkm_index")
HASH_TRACKER_FILE = os.path.join(APP_CONFIG_DIR, "latest_commit.txt")
# ==========================================
# 2. Abstract LLM Provider
# ==========================================
class BaseLLMProvider(ABC):
"""Abstract interface for LLM providers to ensure easy swapping."""
@abstractmethod
# <-- UPDATED: Added tool_choice parameter
def chat_completion(self, messages: List[Dict], tools: List[Dict] = None, stream: bool = False, tool_choice: str = "auto") -> Any:
pass
class CerebrasProvider(BaseLLMProvider):
def __init__(self, model: str = "qwen-3-235b-a22b-instruct-2507"):
api_key = os.environ.get("CEREBRAS_API_KEY")
if not api_key:
raise ValueError("CEREBRAS_API_KEY environment variable is required.")
self.client = Cerebras(api_key=api_key)
self.model = model
def chat_completion(self, messages: List[Dict], tools: List[Dict] = None, stream: bool = False, tool_choice: str = "auto"):
kwargs = {
"messages": messages,
"model": self.model,
"stream": stream,
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = tool_choice # <-- UPDATED
return self.client.chat.completions.create(**kwargs)
# ==========================================
# 3. Core Modules
# ==========================================
class MemoryManager:
def __init__(self, memory_file: str, index_path: str, local_model: str, embed_model_name: str, log: Callable[[str], None] = print):
self.memory_file = memory_file
self.index_path = index_path
self.local_model = local_model
self.log = log
self.session_summary = "Session just started. No prior context."
self.interaction_buffer = []
self.COMPRESSION_THRESHOLD = 4
self.embeddings = OllamaEmbeddings(model=embed_model_name)
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
self.log("[dim italic]Loading persistent memory...[/dim italic]")
if os.path.exists(self.memory_file):
with open(self.memory_file, "r") as f:
self.persistent_memory = f.read().strip()
else:
self.persistent_memory = "No known user facts or long-term preferences."
if os.path.exists(self.index_path):
self.vectorstore = FAISS.load_local(self.index_path, self.embeddings, allow_dangerous_deserialization=True)
else:
self.log("[bold yellow]No memory index found. Building initial database...[/bold yellow]")
self.rebuild_index()
def get_line_count(self) -> int:
if not os.path.exists(self.memory_file):
return 0
with open(self.memory_file, "r") as f:
return sum(1 for _ in f)
def rebuild_index(self):
self.log("[dim italic]Reserializing memory log into vector database...[/dim italic]")
text = self.persistent_memory if self.persistent_memory else "No known user facts or long-term preferences."
chunks = self.text_splitter.split_text(text)
docs = [Document(page_content=c) for c in chunks]
self.vectorstore = FAISS.from_documents(docs, self.embeddings)
self.vectorstore.save_local(self.index_path)
self.log("[bold green]Memory database manually rebuilt and saved![/bold green]")
def compress_persistent_memory(self):
self.log("[bold yellow]Compressing persistent memory (removing duplicates and irrelevant data)...[/bold yellow]")
if not os.path.exists(self.memory_file):
self.log("[dim]Memory file is empty. Nothing to compress.[/dim]")
return
# STRICT PROMPT FOR COMPRESSION
sys_prompt = """You are a strictly robotic data deduplication script. Your ONLY job is to compress the provided memory log.
RULES:
1. Remove duplicate facts.
2. Remove conversational text, essays, or philosophical analysis.
3. Output ONLY a clean, simple bulleted list of facts.
4. NEVER use headers, bold text, or introductory/closing remarks."""
try:
response = ollama.chat(model=self.local_model, messages=[
{'role': 'system', 'content': sys_prompt},
{'role': 'user', 'content': f"MEMORY LOG TO COMPRESS:\n{self.persistent_memory}"}
])
compressed_memory = response['message']['content'].strip()
compressed_memory = re.sub(r'.*?', '', compressed_memory, flags=re.DOTALL).strip()
with open(self.memory_file, "w") as f:
f.write(compressed_memory)
self.persistent_memory = compressed_memory
self.rebuild_index()
self.log("[bold green]Persistent memory successfully compressed and re-indexed![/bold green]")
except Exception as e:
self.log(f"[bold red]Memory compression failed: {e}[/bold red]")
def search(self, query: str) -> str:
if not getattr(self, 'vectorstore', None):
return "No long-term memories available."
docs = self.vectorstore.similarity_search(query, k=3)
return "\n".join([f"- {d.page_content}" for d in docs])
def add_interaction(self, user_input: str, bot_response: str):
self.interaction_buffer.append({"user": user_input, "agent": bot_response})
if len(self.interaction_buffer) >= self.COMPRESSION_THRESHOLD:
buffer_to_compress = list(self.interaction_buffer)
self.interaction_buffer = []
threading.Thread(target=self._compress_session, args=(buffer_to_compress,), daemon=True).start()
def _compress_session(self, buffer: List[Dict]):
buffer_text = "\n".join([f"User: {i['user']}\nAgent: {i['agent']}" for i in buffer])
# STRICT PROMPT FOR SESSION COMPRESSION
sys_prompt = """You are a strict summarization script. Merge the recent interactions into the current session summary.
RULES:
1. Keep it brief and objective.
2. DO NOT write essays or analyze the user's intent.
3. Output ONLY the raw text of the updated summary. No conversational padding."""
try:
response = ollama.chat(model=self.local_model, messages=[
{'role': 'system', 'content': sys_prompt},
{'role': 'user', 'content': f"CURRENT SUMMARY:\n{self.session_summary}\n\nNEW INTERACTIONS:\n{buffer_text}"}
])
self.session_summary = response['message']['content'].strip()
self.session_summary = re.sub(r'.*?', '', self.session_summary, flags=re.DOTALL).strip()
except Exception as e:
self.log(f"[dim red]Background session compression failed: {e}[/dim red]")
def finalize_session(self):
self.log("[bold yellow]Extracting long-term memories from session...[/bold yellow]")
final_context = self.session_summary
if self.interaction_buffer:
final_context += "\n" + "\n".join([f"User: {i['user']}\nAgent: {i['agent']}" for i in self.interaction_buffer])
# STRICT PROMPT FOR EXTRACTION
sys_prompt = """You are a strict data extraction pipeline. Your ONLY job is to extract permanent, long-term facts about the user from the provided session text.
RULES:
1. NEVER write conversational text, greetings, headers, or explanations.
2. NEVER write essays, evaluate, or analyze the meaning of the facts.
3. ONLY output a raw, bulleted list of concise facts (e.g., "- User uses Emacs org-mode").
4. If there are NO new permanent facts to save, output EXACTLY and ONLY the word: NONE.
"""
try:
response = ollama.chat(model=self.local_model, messages=[
{'role': 'system', 'content': sys_prompt},
{'role': 'user', 'content': f"SESSION TEXT TO EXTRACT FROM:\n{final_context}"}
])
new_facts = response['message']['content'].strip()
new_facts = re.sub(r'.*?', '', new_facts, flags=re.DOTALL).strip()
if new_facts.upper() != "NONE" and new_facts:
# Failsafe: If the model hallucinates an essay anyway, block it from saving.
if len(new_facts.split('\n')) > 15 or "###" in new_facts:
self.log("[dim red]Model hallucinated an essay instead of facts. Discarding to protect memory database.[/dim red]")
return
with open(self.memory_file, "a") as f:
f.write(f"\n{new_facts}")
self.persistent_memory += f"\n{new_facts}"
self.log("[bold green]New facts appended to long-term memory log![/bold green]")
self.log("[dim]Note: Run /memory rebuild to index these new facts for next time.[/dim]")
else:
self.log("[dim]No new long-term facts detected. Skipping memory append.[/dim]")
except Exception as e:
self.log(f"[bold red]Failed to save long-term memory: {e}[/bold red]")
class PKMManager:
def __init__(self, pkm_dir: str, index_path: str, hash_file: str, embed_model_name: str, log: Callable[[str], None] = print):
self.pkm_dir = pkm_dir
self.index_path = index_path
self.hash_file = hash_file
self.log = log
self.log(f"[dim italic]Waking up Ollama embeddings ({embed_model_name})...[/dim italic]")
self.embeddings = OllamaEmbeddings(model=embed_model_name)
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
self.vectorstore = self._load_or_build()
def _get_main_commit_hash(self) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "main"],
cwd=self.pkm_dir, capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def _load_or_build(self):
self.log("[dim]Checking Git HEAD hash for PKM changes...[/dim]")
current_hash = self._get_main_commit_hash()
if os.path.exists(self.index_path) and os.path.exists(self.hash_file):
with open(self.hash_file, "r") as f:
if f.read().strip() == current_hash:
self.log(f"[green]Git hash unchanged ({current_hash[:7]}). Loading cached PKM index...[/green]")
return FAISS.load_local(self.index_path, self.embeddings, allow_dangerous_deserialization=True)
self.log(f"[bold yellow]New commits detected ({current_hash[:7]}). Rebuilding PKM index...[/bold yellow]")
raw_documents = []
self.log(f"[dim]Scanning {self.pkm_dir} for .org files...[/dim]")
for root, dirs, files in os.walk(self.pkm_dir):
if '.git' in dirs: dirs.remove('.git')
if 'nix' in dirs: dirs.remove('nix')
for file in files:
if file.endswith('.org'):
filepath = os.path.join(root, file)
try:
with open(filepath, 'r', encoding='utf-8') as f:
raw_documents.append(Document(page_content=f.read(), metadata={"source": filepath}))
except Exception:
pass
if not raw_documents:
self.log("[red]No .org files found in PKM directory.[/red]")
return None
self.log(f"[dim]Chunking {len(raw_documents)} documents...[/dim]")
chunks = self.text_splitter.split_documents(raw_documents)
self.log(f"[bold cyan]Embedding {len(chunks)} chunks via Ollama (this might take a minute)...[/bold cyan]")
vectorstore = FAISS.from_documents(chunks, self.embeddings)
vectorstore.save_local(self.index_path)
with open(self.hash_file, "w") as f:
f.write(current_hash)
self.log("[bold green]PKM Index successfully rebuilt and saved![/bold green]")
return vectorstore
def search(self, query: str) -> str:
if not self.vectorstore:
return "PKM is empty."
docs = self.vectorstore.similarity_search(query, k=10)
return "PKM Search Results:\n" + "\n\n".join([f"From {d.metadata['source']}:\n{d.page_content}" for d in docs])
class VisionProcessor:
def __init__(self, local_model: str, log: Callable[[str], None] = print):
self.local_model = local_model
self.log = log
self.log("[dim italic]Vision Processor online...[/dim italic]")
def process(self, image_path: str, user_prompt: str) -> str:
try:
with open(image_path, 'rb') as img_file:
img_bytes = img_file.read()
response = ollama.chat(model=self.local_model, messages=[{
'role': 'user',
'content': f"Describe this image in detail to help another AI answer this prompt: {user_prompt}",
'images': [img_bytes]
}])
return response['message']['content']
except Exception as e:
return f"[Image analysis failed: {e}]"
# ==========================================
# Web Search Providers
# ==========================================
class BaseSearchProvider(ABC):
"""Abstract interface for web search engines to ensure easy swapping and fallbacks."""
@abstractmethod
def search(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
pass
class GoogleSearchProvider(BaseSearchProvider):
def search(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
# Imported locally so it doesn't crash the app if the package is missing
from googlesearch import search
results = []
# advanced=True forces it to return objects with title, url, and description
for r in search(query, num_results=max_results, advanced=True):
results.append({
'title': getattr(r, 'title', 'No Title'),
'href': getattr(r, 'url', 'No URL'),
'body': getattr(r, 'description', 'No Description')
})
if not results:
raise Exception("Google returned zero results.")
return results
class DDGSSearchProvider(BaseSearchProvider):
def search(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
results = DDGS().text(query, max_results=max_results)
if not results:
raise Exception("DuckDuckGo returned zero results.")
formatted_results = []
for r in results:
formatted_results.append({
'title': r.get('title', 'No Title'),
'href': r.get('href', 'No URL'),
'body': r.get('body', 'No Description')
})
return formatted_results
class WebSearcher:
def __init__(self, log: Callable[[str], None] = print):
self.log = log
# The order of this list dictates the fallback priority
self.providers: List[BaseSearchProvider] = [
GoogleSearchProvider(),
DDGSSearchProvider()
]
def search(self, query: str) -> str:
for provider in self.providers:
provider_name = provider.__class__.__name__
try:
self.log(f"[dim italic]Trying {provider_name}...[/dim italic]")
results = provider.search(query, max_results=10)
context = "Web Search Results:\n"
for r in results:
context += f"- Title: {r['title']}\n URL: {r['href']}\n Snippet: {r['body']}\n\n"
return context
except Exception as e:
# Catch 429 Rate Limits, connection errors, or empty results and seamlessly fall back
self.log(f"[dim yellow]{provider_name} failed ({e}). Falling back...[/dim yellow]")
continue
return "Web search failed: All search providers were exhausted or rate-limited."
# ==========================================
# 4. The Orchestrator (Agnostic Agent)
# ==========================================
class CerebralAgent:
def __init__(self, provider: BaseLLMProvider, log: Callable[[str], None] = print):
self.provider = provider
self.log = log
self.log("[bold magenta]Initializing Cerebral Agent Modules...[/bold magenta]")
self.memory = MemoryManager(MEMORY_FILE, MEMORY_INDEX_PATH, LOCAL_LLM, LOCAL_EMBED_MODEL, self.log)
self.pkm = PKMManager(PKM_DIR, FAISS_INDEX_PATH, HASH_TRACKER_FILE, LOCAL_EMBED_MODEL, self.log)
self.vision = VisionProcessor(LOCAL_LLM, self.log)
self.web = WebSearcher(self.log)
def generate_session_filename(self, first_prompt: str, first_response: str) -> str:
self.log("[dim italic]Generating descriptive filename based on prompt and response...[/dim italic]")
hash_input = (first_prompt + first_response).encode('utf-8')
combined_hash = hashlib.sha256(hash_input).hexdigest()[:6]
sys_prompt = "You are a file naming utility. Read the user's prompt and generate a short, descriptive filename base using ONLY lowercase letters and hyphens. Do NOT add an extension. ONLY output the base filename, absolutely no other text. Example: learning-python-basics"
try:
response = ollama.chat(model=LOCAL_LLM, messages=[
{'role': 'system', 'content': sys_prompt},
{'role': 'user', 'content': first_prompt}
])
raw_content = response['message']['content'].strip()
raw_content = re.sub(r'.*?', '', raw_content, flags=re.DOTALL).strip()
lines = [line.strip() for line in raw_content.split('\n') if line.strip()]
raw_filename = lines[-1].lower().replace(' ', '-') if lines else "cerebral-session"
clean_base = re.sub(r'[^a-z0-9\-]', '', raw_filename).strip('-')
clean_base = clean_base[:50].strip('-')
if not clean_base:
clean_base = "cerebral-session"
final_filename = f"{clean_base}-{combined_hash}.org"
return final_filename
except Exception as e:
self.log(f"[dim red]Filename generation failed: {e}. Defaulting.[/dim red]")
return f"cerebral-session-{combined_hash}.org"
def _get_tools(self) -> List[Dict]:
return [
{
"type": "function",
"function": {
"name": "search_pkm",
"description": "Search the user's personal knowledge base (PKM) for notes, code, or org files.",
"parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
}
},
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the live internet for current events, external documentation, or facts outside your PKM.",
"parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
}
}
]
def chat_stream(self, prompt: str, image_path: Optional[str] = None) -> Generator[str, None, str]:
"""Core interaction loop. Yields text chunks. Returns full text when done."""
recent_history = ""
if self.memory.interaction_buffer:
recent_history = "\nRECENT UNCOMPRESSED TURNS:\n" + "\n".join(
[f"User: {i['user']}\nAgent: {i['agent']}" for i in self.memory.interaction_buffer]
)
vision_context = ""
if image_path:
self.log("[dim italic]Analyzing image context...[/dim italic]")
vision_summary = self.vision.process(image_path, prompt)
vision_context = f"\n[USER ATTACHED AN IMAGE. Local Vision Summary: {vision_summary}]\n"
self.log("[dim italic]Querying long-term memory (Ollama Embeddings)...[/dim italic]")
relevant_memories = self.memory.search(prompt)
current_time = datetime.datetime.now().strftime("%A, %B %d, %Y at %I:%M %p")
system_prompt = f"""You are a highly capable AI assistant.
CRITICAL OUTPUT FORMATTING:
You MUST output your responses EXCLUSIVELY in Emacs org-mode format. Use org-mode headings, lists, and LaTeX fragments for math.
FORMATTING RULES:
1. NEVER use double asterisks (`**`) for bolding. You MUST use SINGLE asterisks for bold emphasis (e.g., *this is bold*). Double asterisks will break the parser.
2. Cite your sources inline using proper org-mode link syntax. For web searches, use [[url][Description]]. For PKM files, use [[file:/path/to/file.org][Filename]].
3. At the very end of your response, you MUST append a Level 1 heading `* Sources` and neatly list all the search results and PKM documents you referenced using proper org-mode syntax.
CURRENT TIME AND DATE:
{current_time}
RESPONSE STYLE GUIDELINES:
- Provide EXTREMELY detailed, exhaustive, and comprehensive answers.
- Write in long-form prose. Do not be brief; expand deeply on concepts.
- Use multiple paragraphs, deep conceptual explanations, and thorough analysis.
RELEVANT LONG-TERM MEMORIES:
{relevant_memories}
COMPRESSED SESSION CONTEXT: {self.memory.session_summary}
{recent_history}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt + vision_context}
]
self.log("[dim italic]Analyzing intent & tool requirements (Cerebras)...[/dim italic]")
# --- NEW: Self-Healing Tool Call Loop ---
MAX_RETRIES = 3
valid_tool_calls = False
response_message = None
allowed_tool_names = [t["function"]["name"] for t in self._get_tools()]
for attempt in range(MAX_RETRIES):
pre_flight = self.provider.chat_completion(messages=messages, tools=self._get_tools(), stream=False)
response_message = pre_flight.choices[0].message
# Scenario A: Hallucinated Markdown Tool Call
if not response_message.tool_calls and response_message.content and "**name**:" in response_message.content:
self.log(f"[dim yellow]Model hallucinated Markdown tool call. Retrying ({attempt+1}/{MAX_RETRIES})...[/dim yellow]")
error_msg = f"ERROR: You attempted to call a tool using Markdown text. You MUST use the native JSON tool calling API. Allowed tools: {allowed_tool_names}"
messages.append({"role": "assistant", "content": response_message.content})
messages.append({"role": "user", "content": error_msg})
continue
# Scenario B: Legitimate text response (No tools needed)
if not response_message.tool_calls:
valid_tool_calls = True
break
# Scenario C: Native API Tool Calls (Needs Validation)
has_errors = False
error_feedbacks = []
for tool_call in response_message.tool_calls:
func_name = tool_call.function.name
call_error = None
if func_name not in allowed_tool_names:
has_errors = True
call_error = f"Tool '{func_name}' does not exist. Allowed tools: {allowed_tool_names}"
else:
try:
json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
has_errors = True
call_error = f"Arguments for '{func_name}' are not valid JSON: {tool_call.function.arguments}"
error_feedbacks.append(call_error)
if has_errors:
self.log(f"[dim yellow]Malformed tool call detected. Retrying ({attempt+1}/{MAX_RETRIES})...[/dim yellow]")
# Append the bad tool call to history so it learns what it did wrong
assistant_msg = {
"role": "assistant",
"content": response_message.content or "",
"tool_calls": [
{
"id": t.id,
"type": "function",
"function": {"name": t.function.name, "arguments": t.function.arguments}
} for t in response_message.tool_calls
]
}
messages.append(assistant_msg)
# Append the specific errors as API tool responses
for i, tool_call in enumerate(response_message.tool_calls):
err = error_feedbacks[i]
msg_content = f"ERROR: {err}" if err else "Error: Another tool in this batch failed. Please fix the batch and retry."
messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": msg_content})
continue
# Scenario D: Valid Tool Calls
valid_tool_calls = True
break
# Failsafe: If it fails 3 times, wipe the tool calls to force a graceful text degradation
if not valid_tool_calls:
self.log("[bold red]Failed to generate valid tool calls. Proceeding without tools.[/bold red]")
response_message.tool_calls = None
# ----------------------------------------
if not response_message.tool_calls:
self.log("[dim italic]No tools needed. Outputting response...[/dim italic]")
content = response_message.content or ""
yield content
self.memory.add_interaction(prompt, content)
return content
# --- Execute Validated Tools ---
assistant_msg = {
"role": "assistant",
"content": response_message.content or "",
"tool_calls": [
{
"id": t.id,
"type": "function",
"function": {"name": t.function.name, "arguments": t.function.arguments}
} for t in response_message.tool_calls
]
}
messages.append(assistant_msg)
for tool_call in response_message.tool_calls:
func_name = tool_call.function.name
args = json.loads(tool_call.function.arguments) # Guaranteed to be safe now
if func_name == "search_pkm":
q = args.get("query", prompt)
self.log(f"[cyan]🧠 Tool Call: Searching PKM for '{q}'...[/cyan]")
yield f"\n*(Agent Note: Searched PKM for `{q}`)*\n\n"
result = self.pkm.search(q)
elif func_name == "search_web":
q = args.get("query", prompt)
self.log(f"[cyan]🌐 Tool Call: Searching Web for '{q}'...[/cyan]")
yield f"\n*(Agent Note: Searched Web for `{q}`)*\n\n"
result = self.web.search(q)
messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": result})
messages.append({
"role": "system",
"content": "Tool results received. Now provide your final, comprehensive answer in strict org-mode. REMEMBER: Use *single asterisks* for bold, NEVER double asterisks."
})
self.log("[dim italic]Streaming final response...[/dim italic]")
stream = self.provider.chat_completion(messages=messages, tools=self._get_tools(), stream=True, tool_choice="none")
full_response = ""
for chunk in stream:
content = chunk.choices[0].delta.content or ""
full_response += content
yield content
self.memory.add_interaction(prompt, full_response)
return full_response
def shutdown(self):
self.memory.finalize_session()
# ==========================================
# 5. The CLI Presentation Layer
# ==========================================
class CLIApp:
def __init__(self, agent: CerebralAgent, console: Console):
self.agent = agent
self.console = console
self.current_session_file = None
def run(self):
self.console.print(Panel.fit("🤖 [bold blue]Modular Cerebral Agent[/bold blue] initialized.\n- Type [bold]/image /path/to/img.png [/bold] to attach images.\n- Type [bold]/exit[/bold] to quit.", border_style="blue"))
while True:
try:
user_input = Prompt.ask("\n[bold magenta]You[/bold magenta]")
if user_input.lower() == '/memory count':
count = self.agent.memory.get_line_count()
self.console.print(f"[bold cyan]Persistent Memory Lines:[/bold cyan] {count}")
continue
if user_input.lower() == '/memory rebuild':
self.agent.memory.rebuild_index()
continue
if user_input.lower() == '/memory compress':
self.agent.memory.compress_persistent_memory()
continue
if clean_input == '/memory':
help_text = (
"[bold cyan]/memory count[/bold cyan] : Print the number of lines in persistent memory.\n"
"[bold cyan]/memory rebuild[/bold cyan] : Manually reserialize the FAISS database from the log.\n"
"[bold cyan]/memory compress[/bold cyan] : Use the local LLM to scrub duplicates and compress the log."
)
self.console.print(Panel.fit(help_text, title="🧠 Memory Commands", border_style="cyan"))
continue
if user_input.lower() in ['/exit', '/quit']:
self.console.print("\n[dim italic]Initiating shutdown sequence...[/dim italic]")
self.agent.shutdown()
self.console.print("[bold red]Exiting...[/bold red]")
break
if not user_input.strip():
continue
image_path = None
prompt = user_input
if user_input.startswith("/image "):
parts = user_input.split(" ", 2)
if len(parts) >= 2:
image_path = parts[1]
prompt = parts[2] if len(parts) > 2 else "What is this?"
self.console.print(f"[dim italic]Processing image locally...[/dim italic]")
self.console.print("[bold green]Agent:[/bold green]")
if not self.current_session_file:
full_response = ""
for chunk in self.agent.chat_stream(prompt, image_path=image_path):
print(chunk, end="", flush=True)
full_response += chunk
print("\n")
generated_name = self.agent.generate_session_filename(prompt, full_response)
self.current_session_file = os.path.join(ORG_OUTPUT_DIR, generated_name)
self.console.print(f"[bold green]Session log created at:[/bold green] [cyan]{self.current_session_file}[/cyan]")
with open(self.current_session_file, "w") as f:
f.write(f"* User Prompt: {user_input}\n** Response\n{full_response}\n")
try:
self.console.print("[dim italic]Triggering emacsclient...[/dim italic]")
subprocess.run(
["emacsclient", "-n", self.current_session_file],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
except Exception as e:
self.console.print(f"[dim red]Failed to trigger emacsclient: {e}[/dim red]")
else:
full_response = ""
with open(self.current_session_file, "a") as f:
f.write(f"\n* User Prompt: {user_input}\n** Response\n")
for chunk in self.agent.chat_stream(prompt, image_path=image_path):
print(chunk, end="", flush=True)
f.write(chunk)
full_response += chunk
f.write("\n")
print("\n")
except KeyboardInterrupt:
self.console.print("\n[bold red]Interrupted. Saving memories...[/bold red]")
self.agent.shutdown()
break
except Exception as e:
self.console.print(f"[bold red]An error occurred: {e}[/bold red]")
# ==========================================
# 6. Entry Point
# ==========================================
if __name__ == "__main__":
console = Console()
try:
provider = CerebrasProvider()
except ValueError as e:
console.print(f"[bold red]Configuration Error: {e}[/bold red]")
sys.exit(1)
with console.status("[bold green]Booting up systems...[/bold green]", spinner="dots") as status:
agent = CerebralAgent(provider=provider, log=console.print)
app = CLIApp(agent, console)
app.run()