Spaces:
Runtime error
Runtime error
| import os, time, json, re, difflib, tempfile, pathlib, shutil, fnmatch | |
| import gradio as gr | |
| from llama_cpp import Llama | |
| MODEL_REPO = "QuantFactory/Phi-3.5-mini-instruct-GGUF" | |
| MODELFILE = "Phi-3.5-mini-instruct.Q4K_M.gguf" | |
| APP_TITLE = "Natural-language self-editing AI (CPU)" | |
| SAVE_PATH = "convos.jsonl" | |
| MAXNEWTOKENS = 768 | |
| N_CTX = 4096 | |
| THREADS = 4 | |
| Globals | |
| llm = None | |
| ROOT_DIR = pathlib.Path(".").resolve() | |
| BACKUPDIR = ROOTDIR / ".fs_backups" | |
| BACKUPDIR.mkdir(existok=True) | |
| DENYLIST = ["/proc", "/sys", "/dev", "/run", "/var/lib/docker", "/var/run"] | |
| {"path": "app.py", "newcontent": "...", "oldcontent": "...", "reason": "…"} | |
| Stored in gr.State | |
| PENDINGKEY = "pendingaction" | |
| def _resolve(path: str) -> pathlib.Path: | |
| p = (ROOT_DIR / path).resolve() | |
| for d in DENYLIST: | |
| if str(p).startswith(d): | |
| raise PermissionError(f"Path {p} is denied") | |
| return p | |
| def make_backup(path: str) -> str: | |
| p = _resolve(path) | |
| if not p.exists(): | |
| return "" | |
| ts = int(time.time()) | |
| rel = str(p.relativeto(ROOTDIR)).replace("/", "") | |
| bk = BACKUP_DIR / f"{rel}.{ts}.bak" | |
| if p.is_file(): | |
| shutil.copy2(p, bk) | |
| return str(bk) | |
| else: | |
| shutil.makearchive(str(bk), "zip", rootdir=str(p)) | |
| return str(bk) + ".zip" | |
| def read_file(path: str) -> str: | |
| p = _resolve(path) | |
| with open(p, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def write_atomic(path: str, content: str) -> str: | |
| p = _resolve(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| backuppath = makebackup(path) | |
| with tempfile.NamedTemporaryFile("w", delete=False, dir=str(p.parent), encoding="utf-8") as tmp: | |
| tmp.write(content) | |
| tmp_name = tmp.name | |
| os.replace(tmp_name, p) | |
| return backup_path | |
| def list_paths(pattern: str = "/*", cwd: str = ".") -> list: | |
| base = _resolve(cwd) | |
| results = [] | |
| for path in base.rglob("*"): | |
| rel = str(path.relativeto(ROOTDIR)) | |
| if fnmatch.fnmatch(rel, pattern): | |
| results.append(rel + ("/" if path.is_dir() else "")) | |
| return results[:1000] | |
| def filedifftext(old: str, new: str, fromname: str, toname: str) -> str: | |
| diff = difflib.unified_diff( | |
| old.splitlines(), new.splitlines(), fromfile=fromname, tofile=toname | |
| ) | |
| return "\n".join(diff) | |
| def saveturn(system, history, usermsg, assistant_msg): | |
| try: | |
| with open(SAVE_PATH, "a", encoding="utf-8") as f: | |
| rec = { | |
| "ts": time.time(), | |
| "system": system, | |
| "history": history, | |
| "user": user_msg, | |
| "assistant": assistant_msg, | |
| } | |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| except Exception: | |
| pass | |
| def get_llm(): | |
| global llm | |
| if llm is not None: | |
| return llm | |
| llm = Llama.from_pretrained( | |
| repoid=MODELREPO, | |
| filename=MODEL_FILE, | |
| nctx=NCTX, | |
| n_threads=THREADS, | |
| ngpulayers=0, | |
| verbose=False | |
| ) | |
| return llm | |
| CODEBLOCKRE = re.compile(r"(?:[\w.+-]+)?\n(.*?)", re.DOTALL) | |
| def extract_fenced(text: str) -> str: | |
| m = CODEBLOCKRE.search(text) | |
| return m.group(1).strip() if m else text | |
| def detectintent(usertext: str) -> str: | |
| """ | |
| Returns: "edit", "create", "chat" | |
| """ | |
| t = user_text.lower() | |
| edit_verbs = ["edit", "change", "modify", "refactor", "fix", "optimize", "speed up", "rework", "rewrite", "patch"] | |
| create_verbs = ["create", "make a new", "add a new", "generate a new", "build a new", "scaffold"] | |
| if any(v in t for v in edit_verbs): | |
| return "edit" | |
| if any(v in t for v in create_verbs): | |
| return "create" | |
| # Heuristic: mentions of specific files imply edit | |
| if re.search(r"\b[\w\-/]+\.py\b", t): | |
| return "edit" | |
| return "chat" | |
| def findtargetfiles(user_text: str) -> list: | |
| """ | |
| Pull explicit filenames from the message; fallback to app.py if none. | |
| """ | |
| files = re.findall(r"([\w\-/]+\.py)\b", user_text) | |
| files = [f for f in files if (ROOT_DIR / f).exists()] | |
| if files: | |
| return files | |
| # sensible default | |
| fallback = ["app.py"] if (ROOT_DIR / "app.py").exists() else [] | |
| return fallback | |
| def proposeeditorcreate(usertext: str) -> dict: | |
| """ | |
| Ask the model to propose a full-file replacement (for edit) or a new file (for create). | |
| Returns: {"path": str, "new_content": str, "reason": str} | |
| """ | |
| targets = findtargetfiles(user_text) | |
| context_blobs = [] | |
| for path in targets: | |
| try: | |
| contextblobs.append(f"File: {path}\npython\n{readfile(path)}\n") | |
| except Exception: | |
| pass | |
| filelistpreview = "\n".join(list_paths("/*.py")) | |
| system_hint = ( | |
| "You are a precise software editor. When asked to change or create code, " | |
| "you return ONLY the complete target file in a single fenced code block, and a brief reason." | |
| ) | |
| user_prompt = | |
| User request: | |
| {user_text} | |
| Existing Python files (truncated): | |
| `text | |
| {filelistpreview} | |
| ` | |
| Context for existing targets: | |
| {("\n\n".join(contextblobs) if contextblobs else "(no existing file context)")} | |
| out = getllm().createchat_completion( | |
| messages=[{"role": "system", "content": system_hint}, | |
| {"role": "user", "content": user_prompt}], | |
| temperature=0.2, | |
| top_p=0.9, | |
| maxtokens=MAXNEW_TOKENS | |
| ) | |
| text = out["choices"][0]["message"]["content"] | |
| # Extract JSON header | |
| json_match = re.search(r"\{.*\}", text, flags=re.DOTALL) | |
| header = {"path": "app.generated.py", "reason": "Generated update"} | |
| if json_match: | |
| try: | |
| header = json.loads(json_match.group(0)) | |
| except Exception: | |
| pass | |
| newcontent = extractfenced(text) | |
| return { | |
| "path": header.get("path", "app.generated.py"), | |
| "newcontent": newcontent, | |
| "reason": header.get("reason", "Proposed change"), | |
| } | |
| def proposediffmessage(path: str, old: str, new: str, reason: str) -> str: | |
| diff = filedifftext(old, new, f"{path} (old)", f"{path} (new)") | |
| preview = diff if diff.strip() else "(no textual differences)" | |
| return ( | |
| f"I proposed changes to {path}:\n" | |
| f"Reason: {reason}\n" | |
| f"Diff:\ndiff\n{preview}\n\n" | |
| f"Apply these changes? Say 'yes' to apply, or 'no' to cancel. " | |
| f"(You can also say 'edit the proposal' to iterate.)" | |
| ) | |
| def apply_pending(pending: dict) -> str: | |
| path = pending["path"] | |
| newcontent = pending["newcontent"] | |
| oldcontent = pending["oldcontent"] | |
| try: | |
| backup = writeatomic(path, newcontent) | |
| return f"Applied changes to {path}. Backup: {backup or 'none'}" | |
| except Exception as e: | |
| # On failure, offer to save to an alternative path | |
| alt = f"{path}.failed.{int(time.time())}.txt" | |
| try: | |
| writeatomic(alt, newcontent) | |
| return f"Failed to write {path}: {e}\nSaved proposed content to {alt}" | |
| except Exception as e2: | |
| return f"Failed to apply and to save alt copy: {e2}" | |
| def natural_yes(text: str) -> bool: | |
| return text.strip().lower() in {"y", "yes", "apply", "do it", "ok", "okay", "sure", "confirm"} | |
| def natural_no(text: str) -> bool: | |
| t = text.strip().lower() | |
| return t in {"n", "no", "cancel", "stop", "reject", "discard"} | |
| def formatmessages(system, history, usermsg): | |
| msgs = [] | |
| if system.strip(): | |
| msgs.append({"role": "system", "content": system}) | |
| for h in history: | |
| msgs.append({"role": h["role"], "content": h["content"]}) | |
| msgs.append({"role": "user", "content": user_msg}) | |
| return msgs | |
| def streamchatresponse(usermsg, history, system, temperature, topp, maxnewtokens): | |
| llm = get_llm() | |
| msgs = formatmessages(system, history, usermsg) | |
| stream = llm.createchatcompletion( | |
| messages=msgs, | |
| temperature=temperature, | |
| topp=topp, | |
| maxtokens=maxnew_tokens, | |
| stream=True | |
| ) | |
| partial = "" | |
| for chunk in stream: | |
| delta = chunk["choices"][0]["delta"] | |
| if "content" in delta and delta["content"] is not None: | |
| piece = delta["content"] | |
| partial += piece | |
| yield partial | |
| return | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown(f"# {APP_TITLE}\nTalk normally. Ask for changes or new files; I’ll propose a patch, show a diff, and wait for your yes/no.") | |
| with gr.Row(): | |
| system = gr.Textbox(label="System prompt", value="You are a helpful, precise, and concise assistant.") | |
| with gr.Row(): | |
| temperature = gr.Slider(0.0, 1.5, value=0.4, step=0.05, label="Temperature") | |
| top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top‑p") | |
| maxnewtokens = gr.Slider(64, 2048, value=MAXNEWTOKENS, step=16, label="Max new tokens") | |
| chat = gr.Chatbot(height=520, showcopybutton=True, type="messages") | |
| user = gr.Textbox(label="Your message", placeholder="Ask anything… e.g., 'Optimize the memory recall code' or 'Create scripts/logger.py that logs messages'") | |
| send = gr.Button("Send", variant="primary") | |
| state = gr.State({PENDING_KEY: None}) | |
| def respond(message, chathistory, system, temperature, topp, maxnewtokens, state_obj): | |
| if not message or not message.strip(): | |
| return gr.update(), chathistory, stateobj | |
| # If there's a pending action, check for yes/no | |
| pending = stateobj.get(PENDINGKEY) | |
| if pending is not None: | |
| if natural_yes(message): | |
| result = apply_pending(pending) | |
| stateobj[PENDINGKEY] = None | |
| newhist = (chathistory or []) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": result}, | |
| ] | |
| return gr.update(value=newhist), newhist, state_obj | |
| elif natural_no(message): | |
| stateobj[PENDINGKEY] = None | |
| newhist = (chathistory or []) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": "Okay, discarded the proposed change."}, | |
| ] | |
| return gr.update(value=newhist), newhist, state_obj | |
| # If neither yes/no, treat as iteration: regenerate proposal using the user's feedback | |
| msg = f"Updating the proposal with your feedback: {message}\nRe‑proposing…" | |
| historymsgs = (chathistory or []) + [{"role": "assistant", "content": msg}] | |
| # Merge feedback into a new proposal prompt by appending to user_text | |
| merged_request = pending.get("reason", "") + "\n\nAdditional feedback: " + message | |
| proposal = proposeeditorcreate(mergedrequest) | |
| path = proposal["path"] | |
| try: | |
| old = read_file(path) | |
| except Exception: | |
| old = "" | |
| diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"]) | |
| # Stash new pending | |
| stateobj[PENDINGKEY] = { | |
| "path": path, | |
| "newcontent": proposal["newcontent"], | |
| "old_content": old, | |
| "reason": proposal["reason"], | |
| } | |
| newhist = historymsgs + [{"role": "assistant", "content": diff_msg}] | |
| return gr.update(value=newhist), newhist, state_obj | |
| # No pending: decide intent | |
| intent = detect_intent(message) | |
| if intent in ("edit", "create"): | |
| proposal = proposeeditor_create(message) | |
| path = proposal["path"] | |
| try: | |
| old = read_file(path) | |
| except Exception: | |
| old = "" | |
| diffmsg = proposediffmessage(path, old, proposal["newcontent"], proposal["reason"]) | |
| # Stash pending | |
| stateobj[PENDINGKEY] = { | |
| "path": path, | |
| "newcontent": proposal["newcontent"], | |
| "old_content": old, | |
| "reason": proposal["reason"], | |
| } | |
| newhist = (chathistory or []) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": diff_msg}, | |
| ] | |
| return gr.update(value=newhist), newhist, state_obj | |
| # Plain chat with streaming | |
| historymsgs = chathistory or [] | |
| bot_text = "" | |
| for partial in streamchatresponse(message, historymsgs, system, temperature, topp, maxnewtokens): | |
| bot_text = partial | |
| yield gr.update(value=(history_msgs + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": bot_text} | |
| ])), (history_msgs + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": bot_text} | |
| ]), state_obj | |
| # Save last turn once streaming ends | |
| saveturn(system, historymsgs, message, bot_text) | |
| send.click( | |
| respond, | |
| [user, chat, system, temperature, topp, maxnew_tokens, state], | |
| [chat, chat, state], | |
| ) | |
| user.submit( | |
| respond, | |
| [user, chat, system, temperature, topp, maxnew_tokens, state], | |
| [chat, chat, state], | |
| ) | |
| if name == "main": | |
| demo.launch(servername="0.0.0.0", serverport=7860) |