#!/usr/bin/env python3 """ Bitcoin Accumulation Zone Monitor — Web Dashboard FastAPI server with inline HTML/CSS/JS dashboard. Monitors on-chain metrics to identify optimal BTC accumulation zones. """ import asyncio import json import logging import os import sys import threading import time import traceback from datetime import datetime, timezone import requests from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") log = logging.getLogger("btc-monitor") BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) from scrapers import fear_greed, price from scoring import engine app = FastAPI(title="Bitcoin Accumulation Zone Monitor") CONFIG_DIR = os.path.join(BASE_DIR, "config") DATA_DIR = os.path.join(BASE_DIR, "data") CACHE_PATH = os.path.join(DATA_DIR, "cache.json") HISTORY_PATH = os.path.join(DATA_DIR, "score_history.jsonl") LLM_SETTINGS_PATH = os.path.join(CONFIG_DIR, "llm_settings.json") os.makedirs(DATA_DIR, exist_ok=True) # Background scraper state _scraper_lock = threading.Lock() _scraper_running = False _last_update = None _last_error = None # ── Cache management ────────────────────────────────────────────────────── def load_cache(): if os.path.exists(CACHE_PATH): try: with open(CACHE_PATH) as f: return json.load(f) except Exception: pass return {} def save_cache(data): with open(CACHE_PATH, "w") as f: json.dump(data, f, indent=2, default=str) def append_history(score_data): """Append a daily score entry to history.""" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "composite_score": score_data.get("composite_score", 0), "scored_count": score_data.get("scored_count", 0), "metrics": { m["key"]: {"score": m["score"], "value": m["value"]} for m in score_data.get("metrics", []) }, } with open(HISTORY_PATH, "a") as f: f.write(json.dumps(entry) + "\n") def load_history(): if not os.path.exists(HISTORY_PATH): return [] entries = [] with open(HISTORY_PATH) as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except Exception: pass return entries # ── Background scraper ──────────────────────────────────────────────────── def run_scrape(force_full=False): """Run a scrape cycle and update cache. By default, only refreshes fast data (price, F&G) and reuses cached on-chain data. On-chain metrics (Playwright scrapes) only refresh if: - force_full=True (manual full refresh) - No cached on-chain data exists - Cached on-chain data is >6 hours old (they update daily) """ global _last_update, _last_error, _scraper_running with _scraper_lock: if _scraper_running: return _scraper_running = True try: # Load existing cache to preserve on-chain data existing_cache = load_cache() metrics = {} # 1. Fear & Greed (fast API call) log.info("Fetching Fear & Greed...") metrics["fear_greed"] = fear_greed.fetch() # 2. BTC Price data (fast API calls) log.info("Fetching BTC price...") price_current = price.fetch_current() metrics["price"] = price_current log.info("Fetching BTC ATH...") ath_data = price.fetch_ath() ath_val = ath_data.get("ath") or existing_cache.get("drawdown", {}).get("ath") if price_current.get("price") and ath_val: drawdown = price.calculate_drawdown(price_current["price"], ath_val) metrics["drawdown"] = {"value": drawdown, "ath": ath_val} elif existing_cache.get("drawdown", {}).get("value") is not None: log.info("ATH fetch failed — reusing cached drawdown") metrics["drawdown"] = existing_cache["drawdown"] else: metrics["drawdown"] = {"value": None} log.info("Fetching historical prices for 200D SMA / Mayer...") hist = price.fetch_historical() if hist: sma_200d = price.calculate_200d_sma(hist) mayer = price.calculate_mayer_multiple(price_current.get("price"), sma_200d) metrics["price_extras"] = {"sma_200d": sma_200d, "mayer_multiple": mayer} else: # CoinGecko rate-limited — compute from history.json instead try: hist_path = os.path.join(DATA_DIR, "history.json") with open(hist_path) as f: hdata = json.load(f) btc_vals = hdata.get("btc_price", {}).get("values", []) if len(btc_vals) >= 200: sma_200d = sum(btc_vals[-200:]) / 200 cur_p = price_current.get("price") or btc_vals[-1] mayer = cur_p / sma_200d if sma_200d else None metrics["price_extras"] = {"sma_200d": sma_200d, "mayer_multiple": round(mayer, 4) if mayer else None} log.info("Computed 200D SMA from history.json (CoinGecko rate-limited)") elif existing_cache.get("price_extras"): metrics["price_extras"] = existing_cache["price_extras"] except Exception: if existing_cache.get("price_extras"): metrics["price_extras"] = existing_cache["price_extras"] log.info("Reusing cached price_extras") # 3. On-chain metrics — use cached values (historical data is permanent) onchain_keys = ["puell_multiple", "mvrv_zscore", "reserve_risk", "rhodl_ratio", "nupl", "200w_sma", "lth_realized_price", "hash_ribbons", "pi_cycle_bottom", "lth_supply"] has_cached_onchain = any(existing_cache.get(k, {}).get("value") is not None for k in onchain_keys) if force_full or not has_cached_onchain: # Only do a full Playwright scrape if explicitly requested or no data exists log.info("Scraping on-chain metrics from LookIntoBitcoin (full refresh requested)...") try: from scrapers import lookintobitcoin onchain = lookintobitcoin.scrape_all() metrics.update(onchain) metrics["_onchain_timestamp"] = datetime.now(timezone.utc).isoformat() except Exception as e: log.error("LookIntoBitcoin scraping failed: %s\n%s", e, traceback.format_exc()) _last_error = f"On-chain scraping failed: {e}" for k in onchain_keys: if k in existing_cache: metrics[k] = existing_cache[k] else: # Reuse cached on-chain values — they're stored permanently log.info("Reusing cached on-chain data (use Full Refresh to re-scrape)") for k in onchain_keys: if k in existing_cache: metrics[k] = existing_cache[k] if "_onchain_timestamp" in existing_cache: metrics["_onchain_timestamp"] = existing_cache["_onchain_timestamp"] # 4. Score everything log.info("Scoring metrics...") scored = engine.score_all(metrics) metrics["_scored"] = scored metrics["_timestamp"] = datetime.now(timezone.utc).isoformat() save_cache(metrics) append_history(scored) # Append today's values to permanent history (incremental, not full re-scrape) try: from scrapers.history_updater import update_history update_history() except Exception as e: log.warning("History update failed (non-critical): %s", e) _last_update = datetime.now(timezone.utc).isoformat() _last_error = None log.info("Scrape cycle complete. Composite score: %s", scored["composite_score"]) except Exception as e: log.error("Scrape cycle error: %s\n%s", e, traceback.format_exc()) _last_error = str(e) finally: with _scraper_lock: _scraper_running = False def scraper_loop(): """Background loop: quick refresh every 15min. Full scrape only on first boot with no data.""" cache = load_cache() has_data = any(cache.get(k, {}).get("value") is not None for k in ["puell_multiple", "mvrv_zscore", "nupl"]) run_scrape(force_full=not has_data) # Full only if no cached on-chain data while True: time.sleep(900) # 15 minutes run_scrape() # Quick refresh only # Start background scraper on import _scraper_thread = threading.Thread(target=scraper_loop, daemon=True) _scraper_thread.start() # ── LLM Settings (preserved from original) ─────────────────────────────── class LLMSettingsUpdate(BaseModel): provider: str model: str providers: dict class TestConnectionRequest(BaseModel): provider: str providers: dict class FetchModelsRequest(BaseModel): provider: str providers: dict def _load_llm_settings(): if os.path.exists(LLM_SETTINGS_PATH): with open(LLM_SETTINGS_PATH) as f: return json.load(f) return { "provider": "ollama", "model": "qwen3.5:27b", "providers": { "ollama": {"base_url": "http://100.100.242.21:11434"}, "lmstudio": {"base_url": "http://100.100.242.21:1234"}, "openai": {"api_key": ""}, "anthropic": {"api_key": ""}, "openrouter": {"api_key": ""}, }, } def _mask_api_key(key): if not key or len(key) < 8: return "" return "••••••••" + key[-4:] def _safe_settings(settings): out = json.loads(json.dumps(settings)) for name, cfg in out.get("providers", {}).items(): if "api_key" in cfg: cfg["api_key"] = _mask_api_key(cfg["api_key"]) return out def _merge_api_keys(new_providers, existing_providers): for name, cfg in new_providers.items(): if "api_key" in cfg: masked = cfg["api_key"] if masked.startswith("••••") or masked == "": existing_key = existing_providers.get(name, {}).get("api_key", "") cfg["api_key"] = existing_key def _fetch_models(provider, providers): cfg = providers.get(provider, {}) if provider == "ollama": base_url = cfg.get("base_url", "http://100.100.242.21:11434") resp = requests.get(f"{base_url}/api/tags", timeout=10) resp.raise_for_status() return [{"id": m["name"], "name": m["name"]} for m in resp.json().get("models", [])] elif provider == "lmstudio": base_url = cfg.get("base_url", "http://100.100.242.21:1234") resp = requests.get(f"{base_url}/v1/models", timeout=10) resp.raise_for_status() return [{"id": m["id"], "name": m["id"]} for m in resp.json().get("data", [])] elif provider == "openai": api_key = cfg.get("api_key", "") if not api_key: raise ValueError("OpenAI API key is required") resp = requests.get("https://api.openai.com/v1/models", headers={"Authorization": f"Bearer {api_key}"}, timeout=15) resp.raise_for_status() models = [m for m in resp.json().get("data", []) if m["id"].startswith("gpt-")] models.sort(key=lambda m: m["id"]) return [{"id": m["id"], "name": m["id"]} for m in models] elif provider == "anthropic": api_key = cfg.get("api_key", "") if not api_key: raise ValueError("Anthropic API key is required") resp = requests.get("https://api.anthropic.com/v1/models", headers={"x-api-key": api_key, "anthropic-version": "2023-06-01"}, timeout=15) resp.raise_for_status() return [{"id": m["id"], "name": m.get("display_name", m["id"])} for m in resp.json().get("data", [])] elif provider == "openrouter": resp = requests.get("https://openrouter.ai/api/v1/models", timeout=15) resp.raise_for_status() models = resp.json().get("data", []) models.sort(key=lambda m: m.get("id", "")) return [{"id": m["id"], "name": m.get("name", m["id"])} for m in models[:200]] else: raise ValueError(f"Unknown provider: {provider}") # ── API Routes ──────────────────────────────────────────────────────────── @app.get("/api/data") def api_data(): """Return current cached metrics + scores.""" cache = load_cache() scored = cache.get("_scored", {}) price_data = cache.get("price", {}) drawdown_data = cache.get("drawdown", {}) extras = cache.get("price_extras", {}) return { "scored": scored, "price": price_data.get("price"), "change_24h": price_data.get("change_24h"), "ath": drawdown_data.get("ath"), "mayer_multiple": extras.get("mayer_multiple"), "sma_200d": extras.get("sma_200d"), "last_update": cache.get("_timestamp"), "scraper_running": _scraper_running, "last_error": _last_error, } @app.get("/api/history") def api_history(): return load_history()[-90:] # Last 90 entries @app.post("/api/refresh") def api_refresh(full: bool = False): """Trigger a scrape. Quick refresh (default) updates price + F&G only (~2s). Full refresh (?full=true) also re-scrapes on-chain data via Playwright (~2-3min).""" if _scraper_running: return JSONResponse({"error": "Scrape already in progress"}, status_code=409) t = threading.Thread(target=run_scrape, kwargs={"force_full": full}, daemon=True) t.start() mode = "full (on-chain + price + F&G)" if full else "quick (price + F&G only)" return {"ok": True, "message": f"Scrape started — {mode}"} # Settings routes (preserved) @app.get("/api/settings") def api_get_settings(): return _safe_settings(_load_llm_settings()) @app.post("/api/settings") def api_save_settings(body: LLMSettingsUpdate): existing = _load_llm_settings() new_settings = {"provider": body.provider, "model": body.model, "providers": body.providers} _merge_api_keys(new_settings["providers"], existing.get("providers", {})) with open(LLM_SETTINGS_PATH, "w") as f: json.dump(new_settings, f, indent=2) return {"ok": True, "message": "Settings saved"} @app.post("/api/settings/test") def api_test_connection(body: TestConnectionRequest): existing = _load_llm_settings() providers = json.loads(json.dumps(body.providers)) _merge_api_keys(providers, existing.get("providers", {})) try: models = _fetch_models(body.provider, providers) return {"ok": True, "models": models, "message": f"Connected — {len(models)} model(s) found"} except requests.exceptions.ConnectionError: return JSONResponse({"ok": False, "error": "Connection refused"}, status_code=502) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.post("/api/settings/models") def api_fetch_models(body: FetchModelsRequest): existing = _load_llm_settings() providers = json.loads(json.dumps(body.providers)) _merge_api_keys(providers, existing.get("providers", {})) try: models = _fetch_models(body.provider, providers) return {"ok": True, "models": models} except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) # ── HTML Pages ──────────────────────────────────────────────────────────── SHARED_CSS = """ *,*::before,*::after{box-sizing:border-box;margin:0;padding:0} :root{--bg:#0f172a;--card:#1e293b;--card-hover:#253349;--text:#e2e8f0;--text-dim:#94a3b8; --accent:#f7931a;--green:#22c55e;--red:#ef4444;--yellow:#eab308;--border:#334155; --mono:'JetBrains Mono','Fira Code','Courier New',monospace;--cyan:#22d3ee; --bright-green:#4ade80;--score-excellent:#22c55e;--score-good:#4ade80; --score-neutral:#eab308;--score-bad:#f97316;--score-terrible:#ef4444} body{font-family:'Inter',sans-serif;background:var(--bg);color:var(--text);min-height:100vh} .container{max-width:1400px;margin:0 auto;padding:16px} h1{font-size:1.5rem;font-weight:700;display:flex;align-items:center;gap:10px} h1 .btc{color:var(--accent);font-size:1.8rem} h2{font-size:.8rem;font-weight:600;color:var(--text-dim);margin-bottom:12px;text-transform:uppercase;letter-spacing:.05em} .header{display:flex;justify-content:space-between;align-items:center;padding:16px 0;border-bottom:1px solid var(--border);margin-bottom:16px;flex-wrap:wrap;gap:12px} .nav{display:flex;gap:4px;align-items:center} .nav a{color:var(--text-dim);text-decoration:none;font-size:.85rem;font-weight:600;padding:6px 14px;border-radius:6px;transition:all .15s} .nav a:hover{color:var(--text);background:var(--card)} .nav a.active{color:var(--cyan);background:var(--card);border:1px solid var(--border)} .btn{padding:8px 18px;border:none;border-radius:6px;font-family:inherit;font-weight:600;font-size:.85rem;cursor:pointer;transition:all .15s} .btn-accent{background:var(--accent);color:#000}.btn-accent:hover{background:#e8850f} .btn-secondary{background:var(--border);color:var(--text)}.btn-secondary:hover{background:var(--card-hover)} .btn-cyan{background:var(--cyan);color:#000}.btn-cyan:hover{background:#06b6d4} .btn:disabled{opacity:.4;cursor:not-allowed} .card{background:var(--card);border-radius:10px;padding:16px;border:1px solid var(--border)} .footer{text-align:center;color:var(--text-dim);font-size:.75rem;padding:20px 0;margin-top:16px;border-top:1px solid var(--border)} .toast{position:fixed;top:20px;right:20px;padding:12px 20px;border-radius:8px;font-size:.85rem;font-weight:600;z-index:9999;opacity:0;transform:translateY(-10px);transition:all .3s;pointer-events:none} .toast.show{opacity:1;transform:translateY(0)} .toast-success{background:var(--green);color:#000} .toast-error{background:var(--red);color:#fff} """ SHARED_HEAD = """ """ NAV_HTML = """""" TOAST_JS = """ function showToast(msg, type) { let t = document.getElementById('toast'); if (!t) { t = document.createElement('div'); t.id = 'toast'; t.className = 'toast'; document.body.appendChild(t); } t.textContent = msg; t.className = 'toast toast-' + type + ' show'; setTimeout(() => t.classList.remove('show'), 3500); } """ DASHBOARD_HTML = """ """ + SHARED_HEAD + """ Bitcoin Accumulation Zone Monitor

Accumulation Zone Monitor

""" + NAV_HTML + """
Loading...
--
of 100
Loading...
--
ATH: -- Mayer: -- 200D SMA: --

On-Chain Metrics

Loading metrics...

Composite Score History

""" SETTINGS_HTML = """ """ + SHARED_HEAD + """ Settings — Bitcoin Accumulation Zone Monitor

Accumulation Zone Monitor

""" + NAV_HTML + """

⚙ LLM Provider Settings

Provider

Connection

""" # ── Backtest API ─────────────────────────────────────────────────────── _history_collector_running = False _history_collector_progress = {} @app.get("/api/backtest") def api_backtest(): """Run backtest and return full results.""" try: from backtesting.engine import run_backtest return run_backtest() except Exception as e: log.error("Backtest error: %s", traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=500) @app.get("/api/backtest/history") def api_backtest_history(): """Return historical daily scores + prices for charting.""" try: from backtesting.engine import run_backtest result = run_backtest() return {"chart_data": result.get("chart_data", []), "date_range": result.get("date_range")} except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.post("/api/backtest/collect") def api_backtest_collect(): """Trigger historical data collection.""" global _history_collector_running, _history_collector_progress if _history_collector_running: return JSONResponse({"error": "Collection already in progress", "progress": _history_collector_progress}, status_code=409) def _run_collector(): global _history_collector_running, _history_collector_progress _history_collector_running = True _history_collector_progress = {"status": "starting", "current": "", "step": 0, "total": 0} try: from scrapers.history_collector import collect_all_history def progress_cb(metric, step, total): _history_collector_progress = {"status": "scraping", "current": metric, "step": step + 1, "total": total} collect_all_history(progress_cb=progress_cb) _history_collector_progress = {"status": "complete"} except Exception as e: log.error("History collection error: %s", traceback.format_exc()) _history_collector_progress = {"status": "error", "error": str(e)} finally: _history_collector_running = False t = threading.Thread(target=_run_collector, daemon=True) t.start() return {"ok": True, "message": "Collection started"} @app.get("/api/backtest/status") def api_backtest_status(): """Check if historical data exists and collection status.""" from scrapers.history_collector import history_status status = history_status() status["collecting"] = _history_collector_running status["progress"] = _history_collector_progress return status # ── Backtest HTML Page ───────────────────────────────────────────────── BACKTEST_HTML = """ """ + SHARED_HEAD + """ Historical Backtest — Bitcoin Accumulation Zone Monitor

Accumulation Zone Monitor

""" + NAV_HTML + """
""" @app.get("/", response_class=HTMLResponse) def dashboard(): return DASHBOARD_HTML @app.get("/backtest", response_class=HTMLResponse) def backtest_page(): return BACKTEST_HTML @app.get("/settings", response_class=HTMLResponse) def settings_page(): return SETTINGS_HTML if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=3088)