Current Signal Context
Historical Score vs BTC Price
Score Bracket Performance
| Score Range | Label | Days | Avg 30d | Avg 90d | Avg 180d | Avg 1yr | Win Rate (1yr) | Max Gain (1yr) | Max Loss (1yr) | Avg Max DD |
|---|
#!/usr/bin/env python3 """ Bitcoin Accumulation Zone Monitor — Web Dashboard FastAPI server with inline HTML/CSS/JS dashboard. Monitors on-chain metrics to identify optimal BTC accumulation zones. """ import asyncio import json import logging import os import sys import threading import time import traceback from datetime import datetime, timezone import requests from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") log = logging.getLogger("btc-monitor") BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) from scrapers import fear_greed, price from scoring import engine app = FastAPI(title="Bitcoin Accumulation Zone Monitor") CONFIG_DIR = os.path.join(BASE_DIR, "config") DATA_DIR = os.path.join(BASE_DIR, "data") CACHE_PATH = os.path.join(DATA_DIR, "cache.json") HISTORY_PATH = os.path.join(DATA_DIR, "score_history.jsonl") LLM_SETTINGS_PATH = os.path.join(CONFIG_DIR, "llm_settings.json") os.makedirs(DATA_DIR, exist_ok=True) # Background scraper state _scraper_lock = threading.Lock() _scraper_running = False _last_update = None _last_error = None # ── Cache management ────────────────────────────────────────────────────── def load_cache(): if os.path.exists(CACHE_PATH): try: with open(CACHE_PATH) as f: return json.load(f) except Exception: pass return {} def save_cache(data): with open(CACHE_PATH, "w") as f: json.dump(data, f, indent=2, default=str) def append_history(score_data): """Append a daily score entry to history.""" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "composite_score": score_data.get("composite_score", 0), "scored_count": score_data.get("scored_count", 0), "metrics": { m["key"]: {"score": m["score"], "value": m["value"]} for m in score_data.get("metrics", []) }, } with open(HISTORY_PATH, "a") as f: f.write(json.dumps(entry) + "\n") def load_history(): if not os.path.exists(HISTORY_PATH): return [] entries = [] with open(HISTORY_PATH) as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except Exception: pass return entries # ── Background scraper ──────────────────────────────────────────────────── def run_scrape(): """Run a full scrape cycle and update cache.""" global _last_update, _last_error, _scraper_running with _scraper_lock: if _scraper_running: return _scraper_running = True try: log.info("Starting scrape cycle...") metrics = {} # 1. Fear & Greed (fast API call) log.info("Fetching Fear & Greed...") metrics["fear_greed"] = fear_greed.fetch() # 2. BTC Price data (fast API calls) log.info("Fetching BTC price...") price_current = price.fetch_current() metrics["price"] = price_current log.info("Fetching BTC ATH...") ath_data = price.fetch_ath() if price_current.get("price") and ath_data.get("ath"): drawdown = price.calculate_drawdown(price_current["price"], ath_data["ath"]) metrics["drawdown"] = {"value": drawdown, "ath": ath_data["ath"]} else: metrics["drawdown"] = {"value": None} log.info("Fetching historical prices...") hist = price.fetch_historical() if hist: sma_200d = price.calculate_200d_sma(hist) mayer = price.calculate_mayer_multiple(price_current.get("price"), sma_200d) metrics["price_extras"] = {"sma_200d": sma_200d, "mayer_multiple": mayer} # 3. On-chain metrics via Playwright (slow) log.info("Scraping on-chain metrics from LookIntoBitcoin...") try: from scrapers import lookintobitcoin onchain = lookintobitcoin.scrape_all() metrics.update(onchain) except Exception as e: log.error("LookIntoBitcoin scraping failed: %s\n%s", e, traceback.format_exc()) _last_error = f"On-chain scraping failed: {e}" # 4. Score everything log.info("Scoring metrics...") scored = engine.score_all(metrics) metrics["_scored"] = scored metrics["_timestamp"] = datetime.now(timezone.utc).isoformat() save_cache(metrics) append_history(scored) _last_update = datetime.now(timezone.utc).isoformat() _last_error = None log.info("Scrape cycle complete. Composite score: %s", scored["composite_score"]) except Exception as e: log.error("Scrape cycle error: %s\n%s", e, traceback.format_exc()) _last_error = str(e) finally: with _scraper_lock: _scraper_running = False def scraper_loop(): """Background loop that runs scrape every 15 minutes.""" while True: run_scrape() time.sleep(900) # 15 minutes # Start background scraper on import _scraper_thread = threading.Thread(target=scraper_loop, daemon=True) _scraper_thread.start() # ── LLM Settings (preserved from original) ─────────────────────────────── class LLMSettingsUpdate(BaseModel): provider: str model: str providers: dict class TestConnectionRequest(BaseModel): provider: str providers: dict class FetchModelsRequest(BaseModel): provider: str providers: dict def _load_llm_settings(): if os.path.exists(LLM_SETTINGS_PATH): with open(LLM_SETTINGS_PATH) as f: return json.load(f) return { "provider": "ollama", "model": "qwen3.5:27b", "providers": { "ollama": {"base_url": "http://100.100.242.21:11434"}, "lmstudio": {"base_url": "http://100.100.242.21:1234"}, "openai": {"api_key": ""}, "anthropic": {"api_key": ""}, "openrouter": {"api_key": ""}, }, } def _mask_api_key(key): if not key or len(key) < 8: return "" return "••••••••" + key[-4:] def _safe_settings(settings): out = json.loads(json.dumps(settings)) for name, cfg in out.get("providers", {}).items(): if "api_key" in cfg: cfg["api_key"] = _mask_api_key(cfg["api_key"]) return out def _merge_api_keys(new_providers, existing_providers): for name, cfg in new_providers.items(): if "api_key" in cfg: masked = cfg["api_key"] if masked.startswith("••••") or masked == "": existing_key = existing_providers.get(name, {}).get("api_key", "") cfg["api_key"] = existing_key def _fetch_models(provider, providers): cfg = providers.get(provider, {}) if provider == "ollama": base_url = cfg.get("base_url", "http://100.100.242.21:11434") resp = requests.get(f"{base_url}/api/tags", timeout=10) resp.raise_for_status() return [{"id": m["name"], "name": m["name"]} for m in resp.json().get("models", [])] elif provider == "lmstudio": base_url = cfg.get("base_url", "http://100.100.242.21:1234") resp = requests.get(f"{base_url}/v1/models", timeout=10) resp.raise_for_status() return [{"id": m["id"], "name": m["id"]} for m in resp.json().get("data", [])] elif provider == "openai": api_key = cfg.get("api_key", "") if not api_key: raise ValueError("OpenAI API key is required") resp = requests.get("https://api.openai.com/v1/models", headers={"Authorization": f"Bearer {api_key}"}, timeout=15) resp.raise_for_status() models = [m for m in resp.json().get("data", []) if m["id"].startswith("gpt-")] models.sort(key=lambda m: m["id"]) return [{"id": m["id"], "name": m["id"]} for m in models] elif provider == "anthropic": api_key = cfg.get("api_key", "") if not api_key: raise ValueError("Anthropic API key is required") resp = requests.get("https://api.anthropic.com/v1/models", headers={"x-api-key": api_key, "anthropic-version": "2023-06-01"}, timeout=15) resp.raise_for_status() return [{"id": m["id"], "name": m.get("display_name", m["id"])} for m in resp.json().get("data", [])] elif provider == "openrouter": resp = requests.get("https://openrouter.ai/api/v1/models", timeout=15) resp.raise_for_status() models = resp.json().get("data", []) models.sort(key=lambda m: m.get("id", "")) return [{"id": m["id"], "name": m.get("name", m["id"])} for m in models[:200]] else: raise ValueError(f"Unknown provider: {provider}") # ── API Routes ──────────────────────────────────────────────────────────── @app.get("/api/data") def api_data(): """Return current cached metrics + scores.""" cache = load_cache() scored = cache.get("_scored", {}) price_data = cache.get("price", {}) drawdown_data = cache.get("drawdown", {}) extras = cache.get("price_extras", {}) return { "scored": scored, "price": price_data.get("price"), "change_24h": price_data.get("change_24h"), "ath": drawdown_data.get("ath"), "mayer_multiple": extras.get("mayer_multiple"), "sma_200d": extras.get("sma_200d"), "last_update": cache.get("_timestamp"), "scraper_running": _scraper_running, "last_error": _last_error, } @app.get("/api/history") def api_history(): return load_history()[-90:] # Last 90 entries @app.post("/api/refresh") def api_refresh(): """Trigger a manual scrape.""" if _scraper_running: return JSONResponse({"error": "Scrape already in progress"}, status_code=409) t = threading.Thread(target=run_scrape, daemon=True) t.start() return {"ok": True, "message": "Scrape started"} # Settings routes (preserved) @app.get("/api/settings") def api_get_settings(): return _safe_settings(_load_llm_settings()) @app.post("/api/settings") def api_save_settings(body: LLMSettingsUpdate): existing = _load_llm_settings() new_settings = {"provider": body.provider, "model": body.model, "providers": body.providers} _merge_api_keys(new_settings["providers"], existing.get("providers", {})) with open(LLM_SETTINGS_PATH, "w") as f: json.dump(new_settings, f, indent=2) return {"ok": True, "message": "Settings saved"} @app.post("/api/settings/test") def api_test_connection(body: TestConnectionRequest): existing = _load_llm_settings() providers = json.loads(json.dumps(body.providers)) _merge_api_keys(providers, existing.get("providers", {})) try: models = _fetch_models(body.provider, providers) return {"ok": True, "models": models, "message": f"Connected — {len(models)} model(s) found"} except requests.exceptions.ConnectionError: return JSONResponse({"ok": False, "error": "Connection refused"}, status_code=502) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.post("/api/settings/models") def api_fetch_models(body: FetchModelsRequest): existing = _load_llm_settings() providers = json.loads(json.dumps(body.providers)) _merge_api_keys(providers, existing.get("providers", {})) try: models = _fetch_models(body.provider, providers) return {"ok": True, "models": models} except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) # ── HTML Pages ──────────────────────────────────────────────────────────── SHARED_CSS = """ *,*::before,*::after{box-sizing:border-box;margin:0;padding:0} :root{--bg:#0f172a;--card:#1e293b;--card-hover:#253349;--text:#e2e8f0;--text-dim:#94a3b8; --accent:#f7931a;--green:#22c55e;--red:#ef4444;--yellow:#eab308;--border:#334155; --mono:'JetBrains Mono','Fira Code','Courier New',monospace;--cyan:#22d3ee; --bright-green:#4ade80;--score-excellent:#22c55e;--score-good:#4ade80; --score-neutral:#eab308;--score-bad:#f97316;--score-terrible:#ef4444} body{font-family:'Inter',sans-serif;background:var(--bg);color:var(--text);min-height:100vh} .container{max-width:1400px;margin:0 auto;padding:16px} h1{font-size:1.5rem;font-weight:700;display:flex;align-items:center;gap:10px} h1 .btc{color:var(--accent);font-size:1.8rem} h2{font-size:.8rem;font-weight:600;color:var(--text-dim);margin-bottom:12px;text-transform:uppercase;letter-spacing:.05em} .header{display:flex;justify-content:space-between;align-items:center;padding:16px 0;border-bottom:1px solid var(--border);margin-bottom:16px;flex-wrap:wrap;gap:12px} .nav{display:flex;gap:4px;align-items:center} .nav a{color:var(--text-dim);text-decoration:none;font-size:.85rem;font-weight:600;padding:6px 14px;border-radius:6px;transition:all .15s} .nav a:hover{color:var(--text);background:var(--card)} .nav a.active{color:var(--cyan);background:var(--card);border:1px solid var(--border)} .btn{padding:8px 18px;border:none;border-radius:6px;font-family:inherit;font-weight:600;font-size:.85rem;cursor:pointer;transition:all .15s} .btn-accent{background:var(--accent);color:#000}.btn-accent:hover{background:#e8850f} .btn-secondary{background:var(--border);color:var(--text)}.btn-secondary:hover{background:var(--card-hover)} .btn-cyan{background:var(--cyan);color:#000}.btn-cyan:hover{background:#06b6d4} .btn:disabled{opacity:.4;cursor:not-allowed} .card{background:var(--card);border-radius:10px;padding:16px;border:1px solid var(--border)} .footer{text-align:center;color:var(--text-dim);font-size:.75rem;padding:20px 0;margin-top:16px;border-top:1px solid var(--border)} .toast{position:fixed;top:20px;right:20px;padding:12px 20px;border-radius:8px;font-size:.85rem;font-weight:600;z-index:9999;opacity:0;transform:translateY(-10px);transition:all .3s;pointer-events:none} .toast.show{opacity:1;transform:translateY(0)} .toast-success{background:var(--green);color:#000} .toast-error{background:var(--red);color:#fff} """ SHARED_HEAD = """ """ NAV_HTML = """
""" TOAST_JS = """ function showToast(msg, type) { let t = document.getElementById('toast'); if (!t) { t = document.createElement('div'); t.id = 'toast'; t.className = 'toast'; document.body.appendChild(t); } t.textContent = msg; t.className = 'toast toast-' + type + ' show'; setTimeout(() => t.classList.remove('show'), 3500); } """ DASHBOARD_HTML = """ """ + SHARED_HEAD + """