"""Homelab Butler – Unified API proxy for Pfannkuchen homelab. Reads credentials from Vaultwarden cache (synced by host cron) with flat-file fallback.""" import os, json, asyncio, logging import httpx from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.responses import JSONResponse from contextlib import asynccontextmanager log = logging.getLogger("butler") API_DIR = os.environ.get("API_KEY_DIR", "/data/api") VAULT_CACHE_DIR = os.environ.get("VAULT_CACHE_DIR", "/data/vault-cache") BUTLER_TOKEN = os.environ.get("BUTLER_TOKEN", "") # --- Credential cache --- _vault_cache: dict[str, str] = {} def _load_vault_cache(): """Load vault items from disk cache (written by host-side vault-sync.sh).""" global _vault_cache if not os.path.isdir(VAULT_CACHE_DIR): log.info(f"No vault cache at {VAULT_CACHE_DIR}") return new = {} for f in os.listdir(VAULT_CACHE_DIR): path = os.path.join(VAULT_CACHE_DIR, f) if os.path.isfile(path): new[f] = open(path).read().strip() _vault_cache = new log.info(f"Loaded {len(new)} vault items from cache") async def _periodic_cache_reload(): """Reload vault cache every 5 minutes (host cron writes new files).""" while True: await asyncio.sleep(300) _load_vault_cache() @asynccontextmanager async def lifespan(app: FastAPI): _load_vault_cache() task = asyncio.create_task(_periodic_cache_reload()) yield task.cancel() app = FastAPI(title="Homelab Butler", version="2.0.0", lifespan=lifespan) # --- Credential reading (vault-first, file-fallback) --- def _read(name): """Read credential: vault cache first, then flat file.""" # Vault cache uses lowercase-hyphenated names vault_name = name.lower().replace("_", "-") if vault_name in _vault_cache: return _vault_cache[vault_name] # Try uppercase convention upper = name.upper().replace("-", "_").lower().replace("_", "-") if upper in _vault_cache: return _vault_cache[upper] # Fallback to flat file try: return open(f"{API_DIR}/{name}").read().strip() except FileNotFoundError: return None def _parse_kv(name): raw = _read(name) if not raw: return {} d = {} for line in raw.splitlines(): if ":" in line: k, v = line.split(":", 1) d[k.strip().lower()] = v.strip() return d def _parse_url_key(name): raw = _read(name) if not raw: return None, None lines = [l.strip() for l in raw.splitlines() if l.strip()] return (lines[0] if lines else None, lines[1] if len(lines) > 1 else None) # --- Service configs --- SERVICES = { "dockhand": {"url": "http://10.4.1.116:3000", "auth": "session", "vault_key": "dockhand_password"}, "sonarr": {"url": "http://10.2.1.100:8989", "auth": "apikey", "key_file": "sonarr", "vault_key": "sonarr_uhd_key"}, "sonarr1080p": {"url": None, "auth": "apikey_urlfile", "key_file": "sonarr1080p"}, "radarr": {"url": "http://10.2.1.100:7878", "auth": "apikey", "key_file": "radarr", "vault_key": "radarr_uhd_key"}, "radarr1080p": {"url": None, "auth": "apikey_urlfile", "key_file": "radarr1080p"}, "seerr": {"url": "http://10.2.1.100:5055", "auth": "apikey", "key_file": "seer", "vault_key": "seerr_api_key"}, "outline": {"url": "http://10.1.1.100:3000", "auth": "bearer", "key_file": "outline", "vault_key": "outline_api_key"}, "n8n": {"url": "http://10.4.1.113:5678", "auth": "n8n", "key_file": "n8n", "vault_key": "n8n_api_key"}, "proxmox": {"url": "https://10.5.85.11:8006", "auth": "proxmox"}, "homeassistant": {"url": "http://10.10.1.20:8123", "auth": "bearer", "key_file": "homeassistent", "vault_key": "ha_token"}, "grafana": {"url": "http://10.1.1.111:3000", "auth": "bearer", "key_file": "grafana", "vault_key": "grafana_api_key"}, "uptime": {"url": "http://159.69.245.190:3001", "auth": "bearer", "key_file": "uptime", "vault_key": "uptime_api_key"}, "waha": {"url": "http://10.4.1.110:3500", "auth": "apikey", "key_file": "waha_api_key", "vault_key": "waha_api_key"}, "forgejo": {"url": "http://10.4.1.116:3001", "auth": "bearer", "key_file": "forgejo", "vault_key": "forgejo_token"}, "semaphore": {"url": "http://10.4.1.116:8090", "auth": "bearer", "key_file": "semaphore", "vault_key": "semaphore_token"}, } # --- Dockhand session --- _dockhand_cookie = None async def _dockhand_login(client): global _dockhand_cookie r = await client.post( f"{SERVICES['dockhand']['url']}/api/auth/login", json={"username": "admin", "password": _read("dockhand") or ""}, ) if r.status_code == 200: _dockhand_cookie = dict(r.cookies) return _dockhand_cookie # --- Auth --- def _verify(request: Request): if not BUTLER_TOKEN: return auth = request.headers.get("authorization", "") if auth != f"Bearer {BUTLER_TOKEN}": raise HTTPException(401, "Invalid token") def _get_key(cfg): vault_key = cfg.get("vault_key") if vault_key and vault_key in _vault_cache: return _vault_cache[vault_key] return _read(cfg.get("key_file", "")) # --- Routes --- @app.get("/") async def root(): return {"service": "homelab-butler", "version": "2.0.0", "services": list(SERVICES.keys()), "vault_items": len(_vault_cache)} @app.get("/health") async def health(): return {"status": "ok", "vault_items": len(_vault_cache)} @app.post("/vault/reload") async def vault_reload(_=Depends(_verify)): _load_vault_cache() return {"reloaded": True, "items": len(_vault_cache)} # --- VM Lifecycle Endpoints --- from pydantic import BaseModel import subprocess as _sp AUTOMATION1 = "sascha@10.5.85.5" ISO_BUILDER = "/app-config/ansible/iso-builder/build-iso.sh" class VMCreate(BaseModel): node: int ip: str hostname: str cores: int = 2 memory: int = 4096 disk: int = 32 def _ssh(host, cmd, timeout=600): r = _sp.run(["ssh","-o","ConnectTimeout=10","-o","StrictHostKeyChecking=accept-new",host,cmd], capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout, r.stderr def _pve_auth(): pv = _parse_kv("proxmox") return f"PVEAPIToken={pv.get('tokenid','')}={pv.get('secret','')}" @app.get("/vm/list") async def vm_list(_=Depends(_verify)): auth = _pve_auth() vms = [] async with httpx.AsyncClient(verify=False, timeout=15) as c: nodes = await c.get("https://10.5.85.11:8006/api2/json/nodes", headers={"Authorization": auth}) for n in nodes.json().get("data", []): r = await c.get(f"https://10.5.85.11:8006/api2/json/nodes/{n['node']}/qemu", headers={"Authorization": auth}) for vm in r.json().get("data", []): vm["node"] = n["node"] vms.append(vm) return vms @app.post("/vm/create") async def vm_create(req: VMCreate, _=Depends(_verify)): steps = [] # Step 1: Build ISO + create VM via iso-builder on automation1 cmd = f"{ISO_BUILDER} --node {req.node} --ip {req.ip} --hostname {req.hostname} --cores {req.cores} --memory {req.memory} --disk {req.disk} --password 'GT500r8' --create-vm" rc, out, err = _ssh(AUTOMATION1, f"cd /app-config/ansible/iso-builder && {cmd}", timeout=300) if rc != 0: return JSONResponse({"error": "iso-builder failed", "stderr": err[-500:], "stdout": out[-500:]}, status_code=500) steps.append("iso-builder: ok") # Step 2: Wait for SSH (up to 6 min) ok = False for _ in range(36): try: rc2, out2, _ = _ssh(f"sascha@{req.ip}", "hostname", timeout=10) if rc2 == 0: ok = True steps.append(f"ssh: {out2.strip()} reachable") break except Exception: pass await asyncio.sleep(10) if not ok: return JSONResponse({"error": "SSH timeout", "steps": steps}, status_code=504) # Step 2.5: Add to Ansible inventory ini = "/app-config/ansible/pfannkuchen.ini" group = getattr(req, 'group', 'auto') inv_cmd = f"""python3 -c " lines = open('{ini}').readlines() if not any('{req.hostname} ' in l for l in lines): out = [] found = False for l in lines: out.append(l) if l.strip() == '[auto]': found = True elif found and (l.startswith('[') or l.strip() == ''): out.insert(-1, '{req.hostname} ansible_host={req.ip}\\n') found = False if found: out.append('{req.hostname} ansible_host={req.ip}\\n') open('{ini}','w').writelines(out) print('added') else: print('exists') " """ _ssh(AUTOMATION1, inv_cmd, timeout=30) _ssh(AUTOMATION1, f"mkdir -p /app-config/ansible/host_vars/{req.hostname} && printf 'ansible_host: {req.ip}\\nansible_user: sascha\\n' > /app-config/ansible/host_vars/{req.hostname}/vars.yml", timeout=30) _ssh(AUTOMATION1, f"ssh-keygen -f /home/sascha/.ssh/known_hosts -R {req.ip} 2>/dev/null; ssh -o StrictHostKeyChecking=accept-new sascha@{req.ip} hostname 2>/dev/null", timeout=30) steps.append("inventory: added") # Step 3: Ansible base setup via direct SSH (reliable fallback) rc3, _, err3 = _ssh(AUTOMATION1, f"cd /app-config/ansible && bash pfannkuchen.sh setup {req.hostname}", timeout=600) steps.append(f"ansible: {'ok' if rc3 == 0 else 'failed (rc=' + str(rc3) + ')'}") return {"status": "ok" if rc3 == 0 else "partial", "hostname": req.hostname, "ip": req.ip, "node": req.node, "steps": steps} @app.get("/vm/status/{vmid}") async def vm_status(vmid: int, _=Depends(_verify)): auth = _pve_auth() async with httpx.AsyncClient(verify=False, timeout=10) as c: nodes = await c.get("https://10.5.85.11:8006/api2/json/nodes", headers={"Authorization": auth}) for n in nodes.json().get("data", []): r = await c.get(f"https://10.5.85.11:8006/api2/json/nodes/{n['node']}/qemu/{vmid}/status/current", headers={"Authorization": auth}) if r.status_code == 200: return r.json().get("data", {}) return JSONResponse({"error": "VM not found"}, status_code=404) @app.delete("/vm/{vmid}") async def vm_delete(vmid: int, _=Depends(_verify)): auth = _pve_auth() async with httpx.AsyncClient(verify=False, timeout=30) as c: nodes = await c.get("https://10.5.85.11:8006/api2/json/nodes", headers={"Authorization": auth}) for n in nodes.json().get("data", []): r = await c.delete(f"https://10.5.85.11:8006/api2/json/nodes/{n['node']}/qemu/{vmid}", headers={"Authorization": auth}) if r.status_code == 200: return r.json() return JSONResponse({"error": "VM not found"}, status_code=404) @app.post("/inventory/host") async def inventory_host(request: Request, _=Depends(_verify)): body = await request.json() name, ip = body["name"], body["ip"] group = body.get("group", "auto") user = body.get("user", "sascha") ini = "/app-config/ansible/pfannkuchen.ini" # Add host to group in pfannkuchen.ini (idempotent) add_cmd = f"""python3 -c " lines = open('{ini}').readlines() # Check if host already exists if any('{name} ' in l or '{name}\\n' in l for l in lines): print('already exists') else: # Find the group and insert after it out, found = [], False for l in lines: out.append(l) if l.strip() == '[{group}]': found = True elif found and (l.startswith('[') or l.strip() == ''): out.insert(-1, '{name} ansible_host={ip}\\n') found = False if found: # group was last out.append('{name} ansible_host={ip}\\n') open('{ini}','w').writelines(out) print('added to [{group}]') " """ rc, out, _ = _ssh(AUTOMATION1, add_cmd, timeout=30) # Also create host_vars _ssh(AUTOMATION1, f"mkdir -p /app-config/ansible/host_vars/{name} && printf 'ansible_host: {ip}\\nansible_user: {user}\\n' > /app-config/ansible/host_vars/{name}/vars.yml", timeout=30) return {"status": "ok", "name": name, "ip": ip, "group": group, "result": out.strip()} @app.post("/ansible/run") async def ansible_run(request: Request, _=Depends(_verify)): body = await request.json() hostname = body.get("limit", body.get("hostname", "")) template_id = body.get("template_id", 10) if not hostname: return JSONResponse({"error": "limit/hostname required"}, status_code=400) rc, out, err = _ssh(AUTOMATION1, f"cd /app-config/ansible && bash pfannkuchen.sh setup {hostname}", timeout=600) return {"status": "ok" if rc == 0 else "error", "rc": rc, "output": out[-1000:]} @app.get("/ansible/status/{job_id}") async def ansible_status(job_id: int, _=Depends(_verify)): return {"info": "direct SSH mode - no async job tracking"} # --- TTS Endpoints --- class TTSRequest(BaseModel): text: str target: str = "speaker" # "speaker" or "telegram" voice: str = "deep_thought.mp3" language: str = "de" SPEAKER_URL = "http://10.10.1.166:10800" CHATTERBOX_URL = "http://10.2.1.104:8004/tts" @app.post("/tts/speak") async def tts_speak(req: TTSRequest, _=Depends(_verify)): if req.target == "speaker": async with httpx.AsyncClient(verify=False, timeout=120) as c: r = await c.post(SPEAKER_URL, json={"text": req.text}) return {"status": "ok" if r.status_code == 200 else "error", "target": "speaker"} elif req.target == "telegram": # Generate WAV via Chatterbox, save to hermes VM as OGG for Telegram voice async with httpx.AsyncClient(verify=False, timeout=120) as c: r = await c.post(CHATTERBOX_URL, json={ "text": req.text, "voice_mode": "clone", "reference_audio_filename": req.voice, "output_format": "wav", "language": req.language, "exaggeration": 0.3, "cfg_weight": 0.7, "temperature": 0.6, }) if r.status_code != 200: return JSONResponse({"error": "chatterbox failed"}, status_code=500) # Save WAV and convert to OGG on hermes import tempfile wav_path = tempfile.mktemp(suffix=".wav") ogg_path = "/tmp/trulla_voice.ogg" with open(wav_path, "wb") as f: f.write(r.content) rc, _, _ = _ssh("sascha@10.4.1.100", f"rm -f {ogg_path}", timeout=10) # Copy WAV to hermes and convert _sp.run(["scp", "-o", "ConnectTimeout=5", wav_path, f"sascha@10.4.1.100:/tmp/trulla_voice.wav"], timeout=30) _ssh("sascha@10.4.1.100", f"ffmpeg -y -i /tmp/trulla_voice.wav -c:a libopus -b:a 64k {ogg_path} 2>/dev/null", timeout=30) os.unlink(wav_path) return {"status": "ok", "target": "telegram", "media_path": ogg_path, "hint": "Use MEDIA:/tmp/trulla_voice.ogg in response"} else: return JSONResponse({"error": f"unknown target: {req.target}"}, status_code=400) @app.get("/tts/voices") async def tts_voices(_=Depends(_verify)): async with httpx.AsyncClient(verify=False, timeout=10) as c: r = await c.get("http://10.2.1.104:8004/get_predefined_voices") return r.json() @app.get("/tts/health") async def tts_health(_=Depends(_verify)): results = {} async with httpx.AsyncClient(verify=False, timeout=5) as c: try: r = await c.get(SPEAKER_URL) results["speaker"] = r.json() except Exception as e: results["speaker"] = {"status": "offline", "error": str(e)} try: r = await c.get("http://10.2.1.104:8004/api/model-info") results["chatterbox"] = "ok" except Exception as e: results["chatterbox"] = {"status": "offline", "error": str(e)} return results @app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) async def proxy(service: str, path: str, request: Request, _=Depends(_verify)): SKIP_SERVICES = {"vm", "inventory", "ansible", "debug", "tts"} if service in SKIP_SERVICES: raise HTTPException(404, f"Unknown service: {service}") cfg = SERVICES.get(service) if not cfg: raise HTTPException(404, f"Unknown service: {service}. Available: {list(SERVICES.keys())}") base_url = cfg["url"] auth_type = cfg["auth"] headers = dict(request.headers) cookies = {} for h in ["host", "content-length", "transfer-encoding", "authorization"]: headers.pop(h, None) if auth_type == "apikey": headers["X-Api-Key"] = _get_key(cfg) or "" elif auth_type == "apikey_urlfile": url, key = _parse_url_key(cfg["key_file"]) base_url = url.rstrip("/") if url else "" headers["X-Api-Key"] = key or "" elif auth_type == "bearer": headers["Authorization"] = f"Bearer {_get_key(cfg)}" elif auth_type == "n8n": headers["X-N8N-API-KEY"] = _get_key(cfg) or "" elif auth_type == "proxmox": pv = _parse_kv("proxmox") headers["Authorization"] = f"PVEAPIToken={pv.get('tokenid', '')}={pv.get('secret', '')}" elif auth_type == "session": global _dockhand_cookie if not _dockhand_cookie: async with httpx.AsyncClient(verify=False) as c: await _dockhand_login(c) cookies = _dockhand_cookie or {} target = f"{base_url}/{path}" body = await request.body() async with httpx.AsyncClient(verify=False, timeout=30.0) as client: resp = await client.request(method=request.method, url=target, headers=headers, cookies=cookies, content=body) if auth_type == "session" and resp.status_code == 401: _dockhand_cookie = None await _dockhand_login(client) resp = await client.request(method=request.method, url=target, headers=headers, cookies=_dockhand_cookie or {}, content=body) try: data = resp.json() except Exception: data = resp.text return JSONResponse(content=data, status_code=resp.status_code)