import os, time, json, re, difflib, tempfile, pathlib, shutil, fnmatch import gradio as gr from llama_cpp import Llama MODEL_REPO = "QuantFactory/Phi-3.5-mini-instruct-GGUF" MODELFILE = "Phi-3.5-mini-instruct.Q4K_M.gguf" APP_TITLE = "Natural-language self-editing AI (CPU)" SAVE_PATH = "convos.jsonl" MAXNEWTOKENS = 768 N_CTX = 4096 THREADS = 4 Globals llm = None ROOT_DIR = pathlib.Path(".").resolve() BACKUPDIR = ROOTDIR / ".fs_backups" BACKUPDIR.mkdir(existok=True) DENYLIST = ["/proc", "/sys", "/dev", "/run", "/var/lib/docker", "/var/run"] {"path": "app.py", "newcontent": "...", "oldcontent": "...", "reason": "…"} Stored in gr.State PENDINGKEY = "pendingaction" def _resolve(path: str) -> pathlib.Path: p = (ROOT_DIR / path).resolve() for d in DENYLIST: if str(p).startswith(d): raise PermissionError(f"Path {p} is denied") return p def make_backup(path: str) -> str: p = _resolve(path) if not p.exists(): return "" ts = int(time.time()) rel = str(p.relativeto(ROOTDIR)).replace("/", "") bk = BACKUP_DIR / f"{rel}.{ts}.bak" if p.is_file(): shutil.copy2(p, bk) return str(bk) else: shutil.makearchive(str(bk), "zip", rootdir=str(p)) return str(bk) + ".zip" def read_file(path: str) -> str: p = _resolve(path) with open(p, "r", encoding="utf-8") as f: return f.read() def write_atomic(path: str, content: str) -> str: p = _resolve(path) p.parent.mkdir(parents=True, exist_ok=True) backuppath = makebackup(path) with tempfile.NamedTemporaryFile("w", delete=False, dir=str(p.parent), encoding="utf-8") as tmp: tmp.write(content) tmp_name = tmp.name os.replace(tmp_name, p) return backup_path def list_paths(pattern: str = "/*", cwd: str = ".") -> list: base = _resolve(cwd) results = [] for path in base.rglob("*"): rel = str(path.relativeto(ROOTDIR)) if fnmatch.fnmatch(rel, pattern): results.append(rel + ("/" if path.is_dir() else "")) return results[:1000] def filedifftext(old: str, new: str, fromname: str, toname: str) -> str: diff = difflib.unified_diff( old.splitlines(), new.splitlines(), fromfile=fromname, tofile=toname ) return "\n".join(diff) def saveturn(system, history, usermsg, assistant_msg): try: with open(SAVE_PATH, "a", encoding="utf-8") as f: rec = { "ts": time.time(), "system": system, "history": history, "user": user_msg, "assistant": assistant_msg, } f.write(json.dumps(rec, ensure_ascii=False) + "\n") except Exception: pass def get_llm(): global llm if llm is not None: return llm llm = Llama.from_pretrained( repoid=MODELREPO, filename=MODEL_FILE, nctx=NCTX, n_threads=THREADS, ngpulayers=0, verbose=False ) return llm CODEBLOCKRE = re.compile(r"(?:[\w.+-]+)?\n(.*?)", re.DOTALL) def extract_fenced(text: str) -> str: m = CODEBLOCKRE.search(text) return m.group(1).strip() if m else text def detectintent(usertext: str) -> str: """ Returns: "edit", "create", "chat" """ t = user_text.lower() edit_verbs = ["edit", "change", "modify", "refactor", "fix", "optimize", "speed up", "rework", "rewrite", "patch"] create_verbs = ["create", "make a new", "add a new", "generate a new", "build a new", "scaffold"] if any(v in t for v in edit_verbs): return "edit" if any(v in t for v in create_verbs): return "create" # Heuristic: mentions of specific files imply edit if re.search(r"\b[\w\-/]+\.py\b", t): return "edit" return "chat" def findtargetfiles(user_text: str) -> list: """ Pull explicit filenames from the message; fallback to app.py if none. """ files = re.findall(r"([\w\-/]+\.py)\b", user_text) files = [f for f in files if (ROOT_DIR / f).exists()] if files: return files # sensible default fallback = ["app.py"] if (ROOT_DIR / "app.py").exists() else [] return fallback def proposeeditorcreate(usertext: str) -> dict: """ Ask the model to propose a full-file replacement (for edit) or a new file (for create). Returns: {"path": str, "new_content": str, "reason": str} """ targets = findtargetfiles(user_text) context_blobs = [] for path in targets: try: contextblobs.append(f"File: {path}\npython\n{readfile(path)}\n") except Exception: pass filelistpreview = "\n".join(list_paths("/*.py")) system_hint = ( "You are a precise software editor. When asked to change or create code, " "you return ONLY the complete target file in a single fenced code block, and a brief reason." ) user_prompt = User request: {user_text} Existing Python files (truncated): `text {filelistpreview} ` Context for existing targets: {("\n\n".join(contextblobs) if contextblobs else "(no existing file context)")} out = getllm().createchat_completion( messages=[{"role": "system", "content": system_hint}, {"role": "user", "content": user_prompt}], temperature=0.2, top_p=0.9, maxtokens=MAXNEW_TOKENS ) text = out["choices"][0]["message"]["content"] # Extract JSON header json_match = re.search(r"\{.*\}", text, flags=re.DOTALL) header = {"path": "app.generated.py", "reason": "Generated update"} if json_match: try: header = json.loads(json_match.group(0)) except Exception: pass newcontent = extractfenced(text) return { "path": header.get("path", "app.generated.py"), "newcontent": newcontent, "reason": header.get("reason", "Proposed change"), } def proposediffmessage(path: str, old: str, new: str, reason: str) -> str: diff = filedifftext(old, new, f"{path} (old)", f"{path} (new)") preview = diff if diff.strip() else "(no textual differences)" return ( f"I proposed changes to {path}:\n" f"Reason: {reason}\n" f"Diff:\ndiff\n{preview}\n\n" f"Apply these changes? Say 'yes' to apply, or 'no' to cancel. " f"(You can also say 'edit the proposal' to iterate.)" ) def apply_pending(pending: dict) -> str: path = pending["path"] newcontent = pending["newcontent"] oldcontent = pending["oldcontent"] try: backup = writeatomic(path, newcontent) return f"Applied changes to {path}. Backup: {backup or 'none'}" except Exception as e: # On failure, offer to save to an alternative path alt = f"{path}.failed.{int(time.time())}.txt" try: writeatomic(alt, newcontent) return f"Failed to write {path}: {e}\nSaved proposed content to {alt}" except Exception as e2: return f"Failed to apply and to save alt copy: {e2}" def natural_yes(text: str) -> bool: return text.strip().lower() in {"y", "yes", "apply", "do it", "ok", "okay", "sure", "confirm"} def natural_no(text: str) -> bool: t = text.strip().lower() return t in {"n", "no", "cancel", "stop", "reject", "discard"} def formatmessages(system, history, usermsg): msgs = [] if system.strip(): msgs.append({"role": "system", "content": system}) for h in history: msgs.append({"role": h["role"], "content": h["content"]}) msgs.append({"role": "user", "content": user_msg}) return msgs def streamchatresponse(usermsg, history, system, temperature, topp, maxnewtokens): llm = get_llm() msgs = formatmessages(system, history, usermsg) stream = llm.createchatcompletion( messages=msgs, temperature=temperature, topp=topp, maxtokens=maxnew_tokens, stream=True ) partial = "" for chunk in stream: delta = chunk["choices"][0]["delta"] if "content" in delta and delta["content"] is not None: piece = delta["content"] partial += piece yield partial return with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}\nTalk normally. Ask for changes or new files; I’ll propose a patch, show a diff, and wait for your yes/no.") with gr.Row(): system = gr.Textbox(label="System prompt", value="You are a helpful, precise, and concise assistant.") with gr.Row(): temperature = gr.Slider(0.0, 1.5, value=0.4, step=0.05, label="Temperature") top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top‑p") maxnewtokens = gr.Slider(64, 2048, value=MAXNEWTOKENS, step=16, label="Max new tokens") chat = gr.Chatbot(height=520, showcopybutton=True, type="messages") user = gr.Textbox(label="Your message", placeholder="Ask anything… e.g., 'Optimize the memory recall code' or 'Create scripts/logger.py that logs messages'") send = gr.Button("Send", variant="primary") state = gr.State({PENDING_KEY: None}) def respond(message, chathistory, system, temperature, topp, maxnewtokens, state_obj): if not message or not message.strip(): return gr.update(), chathistory, stateobj # If there's a pending action, check for yes/no pending = stateobj.get(PENDINGKEY) if pending is not None: if natural_yes(message): result = apply_pending(pending) stateobj[PENDINGKEY] = None newhist = (chathistory or []) + [ {"role": "user", "content": message}, {"role": "assistant", "content": result}, ] return gr.update(value=newhist), newhist, state_obj elif natural_no(message): stateobj[PENDINGKEY] = None newhist = (chathistory or []) + [ {"role": "user", "content": message}, {"role": "assistant", "content": "Okay, discarded the proposed change."}, ] return gr.update(value=newhist), newhist, state_obj # If neither yes/no, treat as iteration: regenerate proposal using the user's feedback msg = f"Updating the proposal with your feedback: {message}\nRe‑proposing…" historymsgs = (chathistory or []) + [{"role": "assistant", "content": msg}] # Merge feedback into a new proposal prompt by appending to user_text merged_request = pending.get("reason", "") + "\n\nAdditional feedback: " + message proposal = proposeeditorcreate(mergedrequest) path = proposal["path"] try: old = read_file(path) except Exception: old = "" diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"]) # Stash new pending stateobj[PENDINGKEY] = { "path": path, "newcontent": proposal["newcontent"], "old_content": old, "reason": proposal["reason"], } newhist = historymsgs + [{"role": "assistant", "content": diff_msg}] return gr.update(value=newhist), newhist, state_obj # No pending: decide intent intent = detect_intent(message) if intent in ("edit", "create"): proposal = proposeeditor_create(message) path = proposal["path"] try: old = read_file(path) except Exception: old = "" diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"]) # Stash pending stateobj[PENDINGKEY] = { "path": path, "newcontent": proposal["newcontent"], "old_content": old, "reason": proposal["reason"], } newhist = (chathistory or []) + [ {"role": "user", "content": message}, {"role": "assistant", "content": diff_msg}, ] return gr.update(value=newhist), newhist, state_obj # Plain chat with streaming historymsgs = chathistory or [] bot_text = "" for partial in streamchatresponse(message, historymsgs, system, temperature, topp, maxnewtokens): bot_text = partial yield gr.update(value=(history_msgs + [ {"role": "user", "content": message}, {"role": "assistant", "content": bot_text} ])), (history_msgs + [ {"role": "user", "content": message}, {"role": "assistant", "content": bot_text} ]), state_obj # Save last turn once streaming ends saveturn(system, historymsgs, message, bot_text) send.click( respond, [user, chat, system, temperature, topp, maxnew_tokens, state], [chat, chat, state], ) user.submit( respond, [user, chat, system, temperature, topp, maxnew_tokens, state], [chat, chat, state], ) if name == "main": demo.launch(servername="0.0.0.0", serverport=7860)