UniversalAI / app.py
boo4blue's picture
Update app.py
e818f63 verified
import os, time, json, re, difflib, tempfile, pathlib, shutil, fnmatch
import gradio as gr
from llama_cpp import Llama
MODEL_REPO = "QuantFactory/Phi-3.5-mini-instruct-GGUF"
MODELFILE = "Phi-3.5-mini-instruct.Q4K_M.gguf"
APP_TITLE = "Natural-language self-editing AI (CPU)"
SAVE_PATH = "convos.jsonl"
MAXNEWTOKENS = 768
N_CTX = 4096
THREADS = 4
Globals
llm = None
ROOT_DIR = pathlib.Path(".").resolve()
BACKUPDIR = ROOTDIR / ".fs_backups"
BACKUPDIR.mkdir(existok=True)
DENYLIST = ["/proc", "/sys", "/dev", "/run", "/var/lib/docker", "/var/run"]
{"path": "app.py", "newcontent": "...", "oldcontent": "...", "reason": "…"}
Stored in gr.State
PENDINGKEY = "pendingaction"
def _resolve(path: str) -> pathlib.Path:
p = (ROOT_DIR / path).resolve()
for d in DENYLIST:
if str(p).startswith(d):
raise PermissionError(f"Path {p} is denied")
return p
def make_backup(path: str) -> str:
p = _resolve(path)
if not p.exists():
return ""
ts = int(time.time())
rel = str(p.relativeto(ROOTDIR)).replace("/", "")
bk = BACKUP_DIR / f"{rel}.{ts}.bak"
if p.is_file():
shutil.copy2(p, bk)
return str(bk)
else:
shutil.makearchive(str(bk), "zip", rootdir=str(p))
return str(bk) + ".zip"
def read_file(path: str) -> str:
p = _resolve(path)
with open(p, "r", encoding="utf-8") as f:
return f.read()
def write_atomic(path: str, content: str) -> str:
p = _resolve(path)
p.parent.mkdir(parents=True, exist_ok=True)
backuppath = makebackup(path)
with tempfile.NamedTemporaryFile("w", delete=False, dir=str(p.parent), encoding="utf-8") as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, p)
return backup_path
def list_paths(pattern: str = "/*", cwd: str = ".") -> list:
base = _resolve(cwd)
results = []
for path in base.rglob("*"):
rel = str(path.relativeto(ROOTDIR))
if fnmatch.fnmatch(rel, pattern):
results.append(rel + ("/" if path.is_dir() else ""))
return results[:1000]
def filedifftext(old: str, new: str, fromname: str, toname: str) -> str:
diff = difflib.unified_diff(
old.splitlines(), new.splitlines(), fromfile=fromname, tofile=toname
)
return "\n".join(diff)
def saveturn(system, history, usermsg, assistant_msg):
try:
with open(SAVE_PATH, "a", encoding="utf-8") as f:
rec = {
"ts": time.time(),
"system": system,
"history": history,
"user": user_msg,
"assistant": assistant_msg,
}
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
except Exception:
pass
def get_llm():
global llm
if llm is not None:
return llm
llm = Llama.from_pretrained(
repoid=MODELREPO,
filename=MODEL_FILE,
nctx=NCTX,
n_threads=THREADS,
ngpulayers=0,
verbose=False
)
return llm
CODEBLOCKRE = re.compile(r"(?:[\w.+-]+)?\n(.*?)", re.DOTALL)
def extract_fenced(text: str) -> str:
m = CODEBLOCKRE.search(text)
return m.group(1).strip() if m else text
def detectintent(usertext: str) -> str:
"""
Returns: "edit", "create", "chat"
"""
t = user_text.lower()
edit_verbs = ["edit", "change", "modify", "refactor", "fix", "optimize", "speed up", "rework", "rewrite", "patch"]
create_verbs = ["create", "make a new", "add a new", "generate a new", "build a new", "scaffold"]
if any(v in t for v in edit_verbs):
return "edit"
if any(v in t for v in create_verbs):
return "create"
# Heuristic: mentions of specific files imply edit
if re.search(r"\b[\w\-/]+\.py\b", t):
return "edit"
return "chat"
def findtargetfiles(user_text: str) -> list:
"""
Pull explicit filenames from the message; fallback to app.py if none.
"""
files = re.findall(r"([\w\-/]+\.py)\b", user_text)
files = [f for f in files if (ROOT_DIR / f).exists()]
if files:
return files
# sensible default
fallback = ["app.py"] if (ROOT_DIR / "app.py").exists() else []
return fallback
def proposeeditorcreate(usertext: str) -> dict:
"""
Ask the model to propose a full-file replacement (for edit) or a new file (for create).
Returns: {"path": str, "new_content": str, "reason": str}
"""
targets = findtargetfiles(user_text)
context_blobs = []
for path in targets:
try:
contextblobs.append(f"File: {path}\npython\n{readfile(path)}\n")
except Exception:
pass
filelistpreview = "\n".join(list_paths("/*.py"))
system_hint = (
"You are a precise software editor. When asked to change or create code, "
"you return ONLY the complete target file in a single fenced code block, and a brief reason."
)
user_prompt =
User request:
{user_text}
Existing Python files (truncated):
`text
{filelistpreview}
`
Context for existing targets:
{("\n\n".join(contextblobs) if contextblobs else "(no existing file context)")}
out = getllm().createchat_completion(
messages=[{"role": "system", "content": system_hint},
{"role": "user", "content": user_prompt}],
temperature=0.2,
top_p=0.9,
maxtokens=MAXNEW_TOKENS
)
text = out["choices"][0]["message"]["content"]
# Extract JSON header
json_match = re.search(r"\{.*\}", text, flags=re.DOTALL)
header = {"path": "app.generated.py", "reason": "Generated update"}
if json_match:
try:
header = json.loads(json_match.group(0))
except Exception:
pass
newcontent = extractfenced(text)
return {
"path": header.get("path", "app.generated.py"),
"newcontent": newcontent,
"reason": header.get("reason", "Proposed change"),
}
def proposediffmessage(path: str, old: str, new: str, reason: str) -> str:
diff = filedifftext(old, new, f"{path} (old)", f"{path} (new)")
preview = diff if diff.strip() else "(no textual differences)"
return (
f"I proposed changes to {path}:\n"
f"Reason: {reason}\n"
f"Diff:\ndiff\n{preview}\n\n"
f"Apply these changes? Say 'yes' to apply, or 'no' to cancel. "
f"(You can also say 'edit the proposal' to iterate.)"
)
def apply_pending(pending: dict) -> str:
path = pending["path"]
newcontent = pending["newcontent"]
oldcontent = pending["oldcontent"]
try:
backup = writeatomic(path, newcontent)
return f"Applied changes to {path}. Backup: {backup or 'none'}"
except Exception as e:
# On failure, offer to save to an alternative path
alt = f"{path}.failed.{int(time.time())}.txt"
try:
writeatomic(alt, newcontent)
return f"Failed to write {path}: {e}\nSaved proposed content to {alt}"
except Exception as e2:
return f"Failed to apply and to save alt copy: {e2}"
def natural_yes(text: str) -> bool:
return text.strip().lower() in {"y", "yes", "apply", "do it", "ok", "okay", "sure", "confirm"}
def natural_no(text: str) -> bool:
t = text.strip().lower()
return t in {"n", "no", "cancel", "stop", "reject", "discard"}
def formatmessages(system, history, usermsg):
msgs = []
if system.strip():
msgs.append({"role": "system", "content": system})
for h in history:
msgs.append({"role": h["role"], "content": h["content"]})
msgs.append({"role": "user", "content": user_msg})
return msgs
def streamchatresponse(usermsg, history, system, temperature, topp, maxnewtokens):
llm = get_llm()
msgs = formatmessages(system, history, usermsg)
stream = llm.createchatcompletion(
messages=msgs,
temperature=temperature,
topp=topp,
maxtokens=maxnew_tokens,
stream=True
)
partial = ""
for chunk in stream:
delta = chunk["choices"][0]["delta"]
if "content" in delta and delta["content"] is not None:
piece = delta["content"]
partial += piece
yield partial
return
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}\nTalk normally. Ask for changes or new files; I’ll propose a patch, show a diff, and wait for your yes/no.")
with gr.Row():
system = gr.Textbox(label="System prompt", value="You are a helpful, precise, and concise assistant.")
with gr.Row():
temperature = gr.Slider(0.0, 1.5, value=0.4, step=0.05, label="Temperature")
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top‑p")
maxnewtokens = gr.Slider(64, 2048, value=MAXNEWTOKENS, step=16, label="Max new tokens")
chat = gr.Chatbot(height=520, showcopybutton=True, type="messages")
user = gr.Textbox(label="Your message", placeholder="Ask anything… e.g., 'Optimize the memory recall code' or 'Create scripts/logger.py that logs messages'")
send = gr.Button("Send", variant="primary")
state = gr.State({PENDING_KEY: None})
def respond(message, chathistory, system, temperature, topp, maxnewtokens, state_obj):
if not message or not message.strip():
return gr.update(), chathistory, stateobj
# If there's a pending action, check for yes/no
pending = stateobj.get(PENDINGKEY)
if pending is not None:
if natural_yes(message):
result = apply_pending(pending)
stateobj[PENDINGKEY] = None
newhist = (chathistory or []) + [
{"role": "user", "content": message},
{"role": "assistant", "content": result},
]
return gr.update(value=newhist), newhist, state_obj
elif natural_no(message):
stateobj[PENDINGKEY] = None
newhist = (chathistory or []) + [
{"role": "user", "content": message},
{"role": "assistant", "content": "Okay, discarded the proposed change."},
]
return gr.update(value=newhist), newhist, state_obj
# If neither yes/no, treat as iteration: regenerate proposal using the user's feedback
msg = f"Updating the proposal with your feedback: {message}\nRe‑proposing…"
historymsgs = (chathistory or []) + [{"role": "assistant", "content": msg}]
# Merge feedback into a new proposal prompt by appending to user_text
merged_request = pending.get("reason", "") + "\n\nAdditional feedback: " + message
proposal = proposeeditorcreate(mergedrequest)
path = proposal["path"]
try:
old = read_file(path)
except Exception:
old = ""
diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"])
# Stash new pending
stateobj[PENDINGKEY] = {
"path": path,
"newcontent": proposal["newcontent"],
"old_content": old,
"reason": proposal["reason"],
}
newhist = historymsgs + [{"role": "assistant", "content": diff_msg}]
return gr.update(value=newhist), newhist, state_obj
# No pending: decide intent
intent = detect_intent(message)
if intent in ("edit", "create"):
proposal = proposeeditor_create(message)
path = proposal["path"]
try:
old = read_file(path)
except Exception:
old = ""
diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"])
# Stash pending
stateobj[PENDINGKEY] = {
"path": path,
"newcontent": proposal["newcontent"],
"old_content": old,
"reason": proposal["reason"],
}
newhist = (chathistory or []) + [
{"role": "user", "content": message},
{"role": "assistant", "content": diff_msg},
]
return gr.update(value=newhist), newhist, state_obj
# Plain chat with streaming
historymsgs = chathistory or []
bot_text = ""
for partial in streamchatresponse(message, historymsgs, system, temperature, topp, maxnewtokens):
bot_text = partial
yield gr.update(value=(history_msgs + [
{"role": "user", "content": message},
{"role": "assistant", "content": bot_text}
])), (history_msgs + [
{"role": "user", "content": message},
{"role": "assistant", "content": bot_text}
]), state_obj
# Save last turn once streaming ends
saveturn(system, historymsgs, message, bot_text)
send.click(
respond,
[user, chat, system, temperature, topp, maxnew_tokens, state],
[chat, chat, state],
)
user.submit(
respond,
[user, chat, system, temperature, topp, maxnew_tokens, state],
[chat, chat, state],
)
if name == "main":
demo.launch(servername="0.0.0.0", serverport=7860)