feat: add historical backtest engine and dashboard page

- scrapers/history_collector.py: scrapes full time series from 8 LookIntoBitcoin
  charts + Fear & Greed API, stores to data/history.json (~5700 days back to 2010)
- backtesting/engine.py: scores each historical day using same thresholds as live
  scoring, computes 30d/90d/180d/1yr forward returns, bracket stats, signal events
- dashboard/server.py: adds /backtest page with dual-axis score vs price chart,
  bracket performance table, signal event list, current context box; adds backtest
  nav link and historical context box on main dashboard; 4 new API endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
BizzleBot 2026-03-20 22:50:57 +00:00
parent e3c5aa9f32
commit 5b3b3811ec
4 changed files with 1168 additions and 0 deletions

0
backtesting/__init__.py Normal file
View File

412
backtesting/engine.py Normal file
View File

@ -0,0 +1,412 @@
"""Historical backtest engine for Bitcoin Accumulation Zone scoring."""
import json
import logging
import os
import sys
from collections import defaultdict
from datetime import datetime, timedelta
log = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BASE_DIR)
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
CACHE_PATH = os.path.join(BASE_DIR, "data", "cache.json")
# Score brackets matching the dashboard assessment levels
BRACKETS = [
(0, 20, "Extreme Caution"),
(21, 40, "Caution"),
(41, 55, "Neutral"),
(56, 70, "Moderate Opportunity"),
(71, 85, "Strong Accumulation"),
(86, 100, "Extreme Accumulation"),
]
# Scoring thresholds — replicated from scoring/engine.py for standalone use
METRIC_SCORERS = {
"fear_greed": {
"ranges": [[None, 10, 10], [10, 25, 7], [25, 45, 4], [45, 55, 2], [55, 75, 1], [75, None, 0]],
},
"puell_multiple": {
"ranges": [[None, 0.3, 10], [0.3, 0.5, 8], [0.5, 0.8, 5], [0.8, 1.2, 3], [1.2, 2.0, 1], [2.0, None, 0]],
},
"mvrv_zscore": {
"ranges": [[None, 0, 10], [0, 0.5, 8], [0.5, 1.5, 5], [1.5, 3, 2], [3, 5, 1], [5, None, 0]],
},
"reserve_risk": {
"ranges": [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]],
},
"rhodl_ratio": {
"ranges": [[None, 100, 10], [100, 500, 7], [500, 2000, 4], [2000, 10000, 1], [10000, None, 0]],
},
"nupl": {
"ranges": [[None, 0, 10], [0, 0.25, 7], [0.25, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]],
},
}
# Ratio-based metrics: score based on price vs reference value
RATIO_SCORERS = {
"price_vs_200w_sma": {
# pct_above ranges
"ranges": [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, 100, 1], [100, None, 0]],
"price_key": "btc_price",
"ref_key": "200w_sma",
},
"lth_realized_price": {
"ranges": [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, None, 1]],
"price_key": "btc_price",
"ref_key": "lth_realized_price",
},
}
# Drawdown scoring
DRAWDOWN_RANGES = [[70, None, 10], [50, 70, 8], [30, 50, 6], [20, 30, 4], [10, 20, 2], [None, 10, 0]]
def _score_range(value, ranges):
"""Score a value using range-based thresholds."""
if value is None:
return None
for low, high, score in ranges:
low_ok = low is None or value >= low
high_ok = high is None or value < high
if low_ok and high_ok:
return score
return 0
def _build_daily_index(history):
"""Build a dict mapping metric_key -> {date_str: value} for fast lookup."""
index = {}
for key, data in history.items():
if key.startswith("_") or not isinstance(data, dict) or "dates" not in data:
continue
lookup = {}
for d, v in zip(data["dates"], data["values"]):
lookup[d] = v
index[key] = lookup
return index
def _get_all_dates(index):
"""Get sorted union of all dates across all metrics."""
all_dates = set()
for lookup in index.values():
all_dates.update(lookup.keys())
return sorted(all_dates)
def _last_known_value(lookup, date, max_lookback=30):
"""Get value for date, or most recent prior value within lookback window."""
if date in lookup:
return lookup[date]
d = datetime.strptime(date, "%Y-%m-%d")
for i in range(1, max_lookback + 1):
prev = (d - timedelta(days=i)).strftime("%Y-%m-%d")
if prev in lookup:
return lookup[prev]
return None
def _compute_ath_series(price_lookup, dates):
"""Compute running ATH and drawdown for each date."""
ath = 0
drawdowns = {}
for d in dates:
p = price_lookup.get(d)
if p is None:
continue
if p > ath:
ath = p
if ath > 0:
drawdowns[d] = ((ath - p) / ath) * 100
return drawdowns
def score_day(date, index, drawdowns):
"""Score a single day using all available metrics. Returns (composite_score, individual_scores, n_metrics)."""
scores = []
details = {}
# Simple range-based metrics
for metric_key, cfg in METRIC_SCORERS.items():
val = _last_known_value(index.get(metric_key, {}), date)
if val is not None:
s = _score_range(val, cfg["ranges"])
if s is not None:
scores.append(s)
details[metric_key] = {"value": val, "score": s}
# Ratio-based metrics (price vs reference)
for metric_key, cfg in RATIO_SCORERS.items():
price_val = _last_known_value(index.get(cfg["price_key"], {}), date)
# Try alternate price keys
if price_val is None:
for pk in ["btc_price_coingecko", "btc_price_sma", "btc_price_lth"]:
price_val = _last_known_value(index.get(pk, {}), date)
if price_val is not None:
break
ref_val = _last_known_value(index.get(cfg["ref_key"], {}), date)
if price_val is not None and ref_val is not None and ref_val > 0:
pct_above = ((price_val - ref_val) / ref_val) * 100
s = _score_range(pct_above, cfg["ranges"])
if s is not None:
scores.append(s)
details[metric_key] = {"value": pct_above, "score": s}
# Drawdown
dd = drawdowns.get(date)
if dd is not None:
s = _score_range(dd, DRAWDOWN_RANGES)
if s is not None:
scores.append(s)
details["drawdown"] = {"value": dd, "score": s}
if not scores:
return None, details, 0
composite = sum(scores) / len(scores) * 10
return round(composite, 1), details, len(scores)
def compute_forward_returns(price_lookup, dates_sorted):
"""Precompute forward returns for all dates."""
periods = [30, 90, 180, 365]
returns = {}
for d in dates_sorted:
p0 = price_lookup.get(d)
if p0 is None or p0 <= 0:
continue
r = {}
dt = datetime.strptime(d, "%Y-%m-%d")
for days in periods:
future = (dt + timedelta(days=days)).strftime("%Y-%m-%d")
pf = price_lookup.get(future)
if pf is not None:
r[f"{days}d"] = round(((pf - p0) / p0) * 100, 2)
if r:
returns[d] = r
return returns
def compute_max_drawdown_forward(price_lookup, date, window=90):
"""Compute max drawdown within N days after a given date."""
dt = datetime.strptime(date, "%Y-%m-%d")
p0 = price_lookup.get(date)
if p0 is None or p0 <= 0:
return None
peak = p0
max_dd = 0
for i in range(1, window + 1):
future = (dt + timedelta(days=i)).strftime("%Y-%m-%d")
pf = price_lookup.get(future)
if pf is None:
continue
if pf > peak:
peak = pf
dd = ((peak - pf) / peak) * 100
if dd > max_dd:
max_dd = dd
return round(max_dd, 2) if max_dd > 0 else 0
def run_backtest():
"""Run the full backtest and return comprehensive results."""
log.info("Loading historical data...")
if not os.path.exists(HISTORY_PATH):
return {"error": "No historical data found. Run history collector first."}
with open(HISTORY_PATH) as f:
history = json.load(f)
index = _build_daily_index(history)
# Build price lookup (prefer coingecko for completeness)
price_lookup = {}
for pk in ["btc_price_coingecko", "btc_price", "btc_price_sma", "btc_price_lth"]:
if pk in index:
for d, v in index[pk].items():
if d not in price_lookup:
price_lookup[d] = v
all_dates = _get_all_dates(index)
if not all_dates:
return {"error": "No date data available."}
log.info("Date range: %s to %s (%d days)", all_dates[0], all_dates[-1], len(all_dates))
# Compute drawdowns
drawdowns = _compute_ath_series(price_lookup, all_dates)
# Precompute forward returns
log.info("Computing forward returns...")
fwd_returns = compute_forward_returns(price_lookup, all_dates)
# Score each day
log.info("Scoring %d days...", len(all_dates))
daily_scores = []
for d in all_dates:
composite, details, n_metrics = score_day(d, index, drawdowns)
if composite is not None and n_metrics >= 3: # Require at least 3 metrics
price = price_lookup.get(d)
entry = {
"date": d,
"score": composite,
"n_metrics": n_metrics,
"price": price,
"forward_returns": fwd_returns.get(d, {}),
}
daily_scores.append(entry)
if not daily_scores:
return {"error": "No scored days (insufficient metric overlap)."}
log.info("Scored %d days with 3+ metrics", len(daily_scores))
# --- Bracket statistics ---
bracket_stats = []
for low, high, label in BRACKETS:
days_in = [d for d in daily_scores if low <= d["score"] <= high]
if not days_in:
bracket_stats.append({
"range": f"{low}-{high}", "label": label, "days": 0,
})
continue
stats = {"range": f"{low}-{high}", "label": label, "days": len(days_in)}
for period in ["30d", "90d", "180d", "365d"]:
returns = [d["forward_returns"][period] for d in days_in if period in d["forward_returns"]]
if returns:
returns_sorted = sorted(returns)
stats[f"avg_{period}"] = round(sum(returns) / len(returns), 2)
stats[f"median_{period}"] = round(returns_sorted[len(returns_sorted) // 2], 2)
stats[f"win_rate_{period}"] = round(len([r for r in returns if r > 0]) / len(returns) * 100, 1)
stats[f"max_gain_{period}"] = round(max(returns), 2)
stats[f"max_loss_{period}"] = round(min(returns), 2)
stats[f"n_{period}"] = len(returns)
# Average max drawdown within 90 days
dd_list = []
for d in days_in:
dd = compute_max_drawdown_forward(price_lookup, d["date"], 90)
if dd is not None:
dd_list.append(dd)
if dd_list:
stats["avg_max_drawdown_90d"] = round(sum(dd_list) / len(dd_list), 2)
bracket_stats.append(stats)
# --- Peak signal events ---
signal_events = []
thresholds = [90, 80, 70]
for thresh in thresholds:
prev_score = 0
for d in daily_scores:
if d["score"] >= thresh and prev_score < thresh:
event = {
"date": d["date"],
"score": d["score"],
"threshold": thresh,
"price": d["price"],
"forward_returns": d["forward_returns"],
}
# Add future prices
if d["price"]:
dt = datetime.strptime(d["date"], "%Y-%m-%d")
for days_ahead in [30, 90, 365]:
future = (dt + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
fp = price_lookup.get(future)
if fp:
event[f"price_{days_ahead}d"] = round(fp, 2)
signal_events.append(event)
prev_score = d["score"]
signal_events.sort(key=lambda e: e["date"])
# --- Current signal context ---
all_scores_list = [d["score"] for d in daily_scores]
all_scores_list.sort()
# Get current score from cache
current_score = None
current_price = None
if os.path.exists(CACHE_PATH):
try:
with open(CACHE_PATH) as f:
cache = json.load(f)
scored = cache.get("_scored", {})
current_score = scored.get("composite_score")
current_price = cache.get("price", {}).get("price")
except Exception:
pass
# If no cache, use latest daily score
if current_score is None and daily_scores:
current_score = daily_scores[-1]["score"]
current_price = daily_scores[-1].get("price")
current_context = None
if current_score is not None:
# Percentile
below = len([s for s in all_scores_list if s <= current_score])
percentile = round(below / len(all_scores_list) * 100, 1)
# Find comparable historical periods
comparable = []
margin = 5
for d in daily_scores:
if abs(d["score"] - current_score) <= margin and d["forward_returns"]:
comparable.append(d)
avg_1yr = None
if comparable:
yr_returns = [d["forward_returns"]["365d"] for d in comparable if "365d" in d["forward_returns"]]
if yr_returns:
avg_1yr = round(sum(yr_returns) / len(yr_returns), 2)
# Best comparable examples (most recent 5)
examples = []
for d in comparable[-5:]:
examples.append({
"date": d["date"],
"score": d["score"],
"price": d["price"],
"forward_returns": d["forward_returns"],
})
current_context = {
"current_score": current_score,
"current_price": current_price,
"percentile": percentile,
"comparable_days": len(comparable),
"avg_1yr_return": avg_1yr,
"examples": examples,
}
# --- Build time series for charting ---
# Downsample to weekly for chart efficiency
chart_data = []
for i, d in enumerate(daily_scores):
# Include every 7th day + last day
if i % 7 == 0 or i == len(daily_scores) - 1:
chart_data.append({
"date": d["date"],
"score": d["score"],
"price": d["price"],
})
result = {
"date_range": {"start": daily_scores[0]["date"], "end": daily_scores[-1]["date"]},
"total_days_scored": len(daily_scores),
"bracket_stats": bracket_stats,
"signal_events": signal_events,
"current_context": current_context,
"chart_data": chart_data,
"computed_at": datetime.utcnow().isoformat() + "Z",
}
log.info("Backtest complete: %d days, %d signal events", len(daily_scores), len(signal_events))
return result

View File

@ -389,6 +389,7 @@ SHARED_HEAD = """<meta charset="UTF-8">
NAV_HTML = """<div class="nav">
<a href="/" id="nav-dashboard">Dashboard</a>
<a href="/backtest" id="nav-backtest">&#x1F4CA; Backtest</a>
<a href="/settings" id="nav-settings">&#9881; Settings</a>
</div>"""
@ -488,6 +489,13 @@ DASHBOARD_HTML = """<!DOCTYPE html>
</div>
</div>
<!-- Historical Context (from backtest) -->
<div class="card" id="histContext" style="margin-bottom:20px;display:none;border-color:#22d3ee">
<h2 style="color:#22d3ee">Historical Context</h2>
<div id="histContextText" style="font-size:.9rem;font-family:var(--mono);line-height:1.6"></div>
<a href="/backtest" style="font-size:.8rem;color:#22d3ee;text-decoration:none;margin-top:8px;display:inline-block">View full backtest &rarr;</a>
</div>
<!-- Metrics Grid -->
<h2>On-Chain Metrics</h2>
<div class="metrics-grid" id="metricsGrid">
@ -748,6 +756,28 @@ async function doRefresh() {
drawScoreRing(0);
poll();
setInterval(poll, 30000);
// Load historical context from backtest
(async function() {
try {
const r = await fetch('/api/backtest/status');
const s = await r.json();
if (!s.exists) return;
const br = await fetch('/api/backtest');
const bt = await br.json();
if (bt.error || !bt.current_context) return;
const ctx = bt.current_context;
const el = document.getElementById('histContext');
const txt = document.getElementById('histContextText');
let html = 'Score <strong>' + ctx.current_score + '</strong> is in the <strong style="color:#22d3ee">top ' + (100 - ctx.percentile).toFixed(1) + '%</strong> historically.';
if (ctx.avg_1yr_return != null) {
const c = ctx.avg_1yr_return >= 0 ? '#22c55e' : '#ef4444';
html += ' Average 1-year return from this level: <strong style="color:' + c + '">' + (ctx.avg_1yr_return >= 0 ? '+' : '') + ctx.avg_1yr_return.toFixed(1) + '%</strong>';
}
txt.innerHTML = html;
el.style.display = 'block';
} catch(e) { /* backtest data not available yet */ }
})();
</script>
</body>
</html>"""
@ -929,11 +959,461 @@ loadSettings();
</html>"""
# ── Backtest API ───────────────────────────────────────────────────────
_history_collector_running = False
_history_collector_progress = {}
@app.get("/api/backtest")
def api_backtest():
"""Run backtest and return full results."""
try:
from backtesting.engine import run_backtest
return run_backtest()
except Exception as e:
log.error("Backtest error: %s", traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
@app.get("/api/backtest/history")
def api_backtest_history():
"""Return historical daily scores + prices for charting."""
try:
from backtesting.engine import run_backtest
result = run_backtest()
return {"chart_data": result.get("chart_data", []), "date_range": result.get("date_range")}
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
@app.post("/api/backtest/collect")
def api_backtest_collect():
"""Trigger historical data collection."""
global _history_collector_running, _history_collector_progress
if _history_collector_running:
return JSONResponse({"error": "Collection already in progress", "progress": _history_collector_progress}, status_code=409)
def _run_collector():
global _history_collector_running, _history_collector_progress
_history_collector_running = True
_history_collector_progress = {"status": "starting", "current": "", "step": 0, "total": 0}
try:
from scrapers.history_collector import collect_all_history
def progress_cb(metric, step, total):
_history_collector_progress = {"status": "scraping", "current": metric, "step": step + 1, "total": total}
collect_all_history(progress_cb=progress_cb)
_history_collector_progress = {"status": "complete"}
except Exception as e:
log.error("History collection error: %s", traceback.format_exc())
_history_collector_progress = {"status": "error", "error": str(e)}
finally:
_history_collector_running = False
t = threading.Thread(target=_run_collector, daemon=True)
t.start()
return {"ok": True, "message": "Collection started"}
@app.get("/api/backtest/status")
def api_backtest_status():
"""Check if historical data exists and collection status."""
from scrapers.history_collector import history_status
status = history_status()
status["collecting"] = _history_collector_running
status["progress"] = _history_collector_progress
return status
# ── Backtest HTML Page ─────────────────────────────────────────────────
BACKTEST_HTML = """<!DOCTYPE html>
<html lang="en">
<head>
""" + SHARED_HEAD + """
<title>Historical Backtest Bitcoin Accumulation Zone Monitor</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.4/dist/chart.umd.min.js"></script>
<style>
""" + SHARED_CSS + """
.section{margin-bottom:24px}
.section h2{margin-bottom:12px}
.collect-banner{background:var(--card);border:1px solid var(--border);border-radius:10px;padding:24px;text-align:center;margin-bottom:24px}
.collect-banner p{color:var(--text-dim);margin:8px 0 16px}
.progress-bar{width:100%;height:8px;background:var(--bg);border-radius:4px;overflow:hidden;margin:12px 0}
.progress-fill{height:100%;background:var(--cyan);border-radius:4px;transition:width .3s}
.progress-text{font-size:.8rem;color:var(--text-dim);font-family:var(--mono)}
table{width:100%;border-collapse:collapse;font-size:.82rem;font-family:var(--mono)}
th{text-align:left;padding:10px 8px;border-bottom:2px solid var(--border);color:var(--text-dim);font-size:.7rem;text-transform:uppercase;letter-spacing:.06em;white-space:nowrap}
td{padding:8px;border-bottom:1px solid var(--border)}
tr:hover td{background:var(--card-hover)}
.t-green{color:var(--green)}.t-red{color:var(--red)}.t-yellow{color:var(--yellow)}.t-cyan{color:var(--cyan)}
.chart-dual{position:relative;height:400px}
.context-box{background:var(--card);border:1px solid var(--cyan);border-radius:10px;padding:20px}
.context-score{font-size:2.5rem;font-weight:800;font-family:var(--mono);margin-bottom:4px}
.context-percentile{font-size:1rem;color:var(--cyan);margin-bottom:12px}
.context-return{font-size:1.1rem;color:var(--green);font-weight:600;margin-bottom:12px}
.comparable-list{margin-top:12px}
.comparable-item{display:flex;justify-content:space-between;padding:6px 0;border-bottom:1px solid var(--border);font-size:.82rem;font-family:var(--mono)}
.signal-card{background:var(--card);border-radius:8px;padding:12px;border:1px solid var(--border);margin-bottom:8px}
.signal-header{display:flex;justify-content:space-between;align-items:center;margin-bottom:6px}
.signal-date{font-weight:700;color:var(--accent);font-family:var(--mono)}
.signal-score{font-weight:800;font-family:var(--mono);padding:2px 8px;border-radius:4px;font-size:.85rem}
.signal-returns{display:flex;gap:16px;font-size:.8rem;font-family:var(--mono)}
.loading-spinner{display:inline-block;width:20px;height:20px;border:3px solid var(--border);border-top-color:var(--cyan);border-radius:50%;animation:spin .6s linear infinite}
@keyframes spin{to{transform:rotate(360deg)}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<div>
<h1><span class="btc">&#x20BF;</span> Accumulation Zone Monitor</h1>
<div style="margin-top:8px;display:flex;align-items:center;gap:12px">
""" + NAV_HTML + """
</div>
</div>
</div>
<!-- Collection Banner (shown if no data) -->
<div id="collectBanner" class="collect-banner" style="display:none">
<h2>Historical Data Required</h2>
<p>Scrape full historical time series from LookIntoBitcoin charts, CoinGecko, and Fear & Greed Index.<br>This takes several minutes (10+ charts to scrape).</p>
<button class="btn btn-cyan" id="btnCollect" onclick="startCollection()">Collect Historical Data</button>
<div id="progressArea" style="display:none;margin-top:16px">
<div class="progress-bar"><div class="progress-fill" id="progressFill" style="width:0%"></div></div>
<div class="progress-text" id="progressText">Starting...</div>
</div>
</div>
<!-- Main content (shown after data exists) -->
<div id="mainContent" style="display:none">
<!-- Section 1: Current Signal Context -->
<div class="section">
<div class="context-box" id="contextBox">
<h2>Current Signal Context</h2>
<div class="context-score" id="ctxScore">--</div>
<div class="context-percentile" id="ctxPercentile">Loading...</div>
<div class="context-return" id="ctxReturn"></div>
<div class="comparable-list" id="ctxComparables"></div>
</div>
</div>
<!-- Section 2: Historical Score vs Price Chart -->
<div class="section">
<div class="card">
<h2>Historical Score vs BTC Price</h2>
<div class="chart-dual">
<canvas id="dualChart"></canvas>
</div>
</div>
</div>
<!-- Section 3: Score Bracket Performance -->
<div class="section">
<div class="card">
<h2>Score Bracket Performance</h2>
<div style="overflow-x:auto">
<table id="bracketTable">
<thead>
<tr>
<th>Score Range</th><th>Label</th><th>Days</th>
<th>Avg 30d</th><th>Avg 90d</th><th>Avg 1yr</th>
<th>Win Rate (1yr)</th><th>Max Gain (1yr)</th><th>Max Loss (1yr)</th>
<th>Avg Max DD</th>
</tr>
</thead>
<tbody id="bracketBody"></tbody>
</table>
</div>
</div>
</div>
<!-- Section 4: Major Signal Events -->
<div class="section">
<div class="card">
<h2>Major Signal Events (Score Crossed 70/80/90+)</h2>
<div id="signalEvents"></div>
</div>
</div>
</div><!-- /mainContent -->
<div class="footer">Bitcoin Accumulation Zone Monitor &mdash; Historical Backtest Engine</div>
</div>
<script>
document.getElementById('nav-backtest').classList.add('active');
""" + TOAST_JS + """
let backtestData = null;
function fmtPct(v) { if (v == null) return '--'; return (v >= 0 ? '+' : '') + v.toFixed(1) + '%'; }
function fmtPrice(v) { if (v == null) return '--'; return '$' + Math.round(v).toLocaleString(); }
function retClass(v) { if (v == null) return ''; return v >= 0 ? 't-green' : 't-red'; }
function scoreColorCSS(score) {
if (score >= 71) return '#22c55e';
if (score >= 51) return '#4ade80';
if (score >= 31) return '#eab308';
if (score >= 15) return '#f97316';
return '#ef4444';
}
async function checkStatus() {
try {
const r = await fetch('/api/backtest/status');
const s = await r.json();
if (s.exists && !s.collecting) {
document.getElementById('collectBanner').style.display = 'none';
document.getElementById('mainContent').style.display = 'block';
loadBacktest();
} else if (s.collecting) {
document.getElementById('collectBanner').style.display = 'block';
document.getElementById('progressArea').style.display = 'block';
document.getElementById('btnCollect').disabled = true;
document.getElementById('btnCollect').textContent = 'Collecting...';
const p = s.progress || {};
if (p.total > 0) {
const pct = Math.round((p.step / (p.total + 2)) * 100);
document.getElementById('progressFill').style.width = pct + '%';
document.getElementById('progressText').textContent = p.current + ' (' + p.step + '/' + p.total + ')';
} else {
document.getElementById('progressText').textContent = p.status || 'Working...';
}
if (p.status === 'complete') {
document.getElementById('collectBanner').style.display = 'none';
document.getElementById('mainContent').style.display = 'block';
loadBacktest();
return;
}
setTimeout(checkStatus, 3000);
} else {
document.getElementById('collectBanner').style.display = 'block';
}
} catch(e) { console.error(e); }
}
async function startCollection() {
document.getElementById('btnCollect').disabled = true;
document.getElementById('btnCollect').textContent = 'Starting...';
document.getElementById('progressArea').style.display = 'block';
try {
const r = await fetch('/api/backtest/collect', { method: 'POST' });
const d = await r.json();
if (d.error) { showToast(d.error, 'error'); return; }
showToast('Collection started — this will take several minutes', 'success');
setTimeout(checkStatus, 3000);
} catch(e) { showToast('Failed: ' + e, 'error'); }
}
async function loadBacktest() {
try {
document.getElementById('mainContent').innerHTML = '<div style="text-align:center;padding:60px;color:var(--text-dim)"><div class="loading-spinner"></div><p style="margin-top:16px">Running backtest analysis...</p></div>';
const r = await fetch('/api/backtest');
backtestData = await r.json();
if (backtestData.error) {
document.getElementById('mainContent').innerHTML = '<div class="card" style="text-align:center;padding:40px;color:var(--red)">' + backtestData.error + '</div>';
return;
}
renderAll();
} catch(e) { console.error(e); }
}
function renderAll() {
// Rebuild the main content
document.getElementById('mainContent').innerHTML = `
<div class="section"><div class="context-box" id="contextBox"><h2>Current Signal Context</h2><div class="context-score" id="ctxScore">--</div><div class="context-percentile" id="ctxPercentile"></div><div class="context-return" id="ctxReturn"></div><div class="comparable-list" id="ctxComparables"></div></div></div>
<div class="section"><div class="card"><h2>Historical Score vs BTC Price</h2><div class="chart-dual"><canvas id="dualChart"></canvas></div></div></div>
<div class="section"><div class="card"><h2>Score Bracket Performance</h2><div style="overflow-x:auto"><table><thead><tr><th>Score Range</th><th>Label</th><th>Days</th><th>Avg 30d</th><th>Avg 90d</th><th>Avg 1yr</th><th>Win Rate (1yr)</th><th>Max Gain</th><th>Max Loss</th><th>Avg Max DD</th></tr></thead><tbody id="bracketBody"></tbody></table></div></div></div>
<div class="section"><div class="card"><h2>Major Signal Events (Score Crossed 70/80/90+)</h2><div id="signalEvents"></div></div></div>
`;
renderContext();
renderDualChart();
renderBracketTable();
renderSignalEvents();
}
function renderContext() {
const ctx = backtestData.current_context;
if (!ctx) return;
const el = document.getElementById('ctxScore');
el.textContent = ctx.current_score + ' / 100';
el.style.color = scoreColorCSS(ctx.current_score);
document.getElementById('ctxPercentile').textContent =
'Historical percentile: top ' + (100 - ctx.percentile).toFixed(1) + '% of all days (' + ctx.comparable_days + ' comparable days found)';
if (ctx.avg_1yr_return != null) {
document.getElementById('ctxReturn').textContent =
'Average 1-year return from this score level: ' + fmtPct(ctx.avg_1yr_return);
document.getElementById('ctxReturn').className = 'context-return ' + retClass(ctx.avg_1yr_return);
}
// Examples
const list = document.getElementById('ctxComparables');
if (ctx.examples && ctx.examples.length) {
let html = '<h2 style="margin-top:12px">Comparable Historical Periods</h2>';
for (const ex of ctx.examples) {
const fr = ex.forward_returns || {};
html += '<div class="comparable-item"><span>' + ex.date + ' — Score ' + ex.score + '' + fmtPrice(ex.price) + '</span>';
html += '<span>';
if (fr['30d'] != null) html += '<span class="' + retClass(fr['30d']) + '">30d: ' + fmtPct(fr['30d']) + '</span> ';
if (fr['365d'] != null) html += '<span class="' + retClass(fr['365d']) + '">1yr: ' + fmtPct(fr['365d']) + '</span>';
html += '</span></div>';
}
list.innerHTML = html;
}
}
function renderDualChart() {
const chart = backtestData.chart_data;
if (!chart || !chart.length) return;
const ctx = document.getElementById('dualChart').getContext('2d');
const labels = chart.map(d => d.date);
const scores = chart.map(d => d.score);
const prices = chart.map(d => d.price);
// Zone backgrounds via plugin
const zonePlugin = {
id: 'zoneBackground',
beforeDraw(chart) {
const { ctx: c, chartArea: {left, right, top, bottom}, scales: {y} } = chart;
if (!y) return;
const zones = [
{ min: 0, max: 40, color: 'rgba(239,68,68,0.06)' },
{ min: 40, max: 70, color: 'rgba(234,179,8,0.06)' },
{ min: 70, max: 100, color: 'rgba(34,197,94,0.08)' },
];
for (const z of zones) {
const yTop = y.getPixelForValue(Math.min(z.max, 100));
const yBot = y.getPixelForValue(z.min);
c.fillStyle = z.color;
c.fillRect(left, yTop, right - left, yBot - yTop);
}
}
};
new Chart(ctx, {
type: 'line',
plugins: [zonePlugin],
data: {
labels,
datasets: [
{
label: 'Accumulation Score',
data: scores,
borderColor: '#f7931a',
backgroundColor: 'rgba(247,147,26,0.1)',
borderWidth: 1.5,
fill: false,
tension: 0.2,
pointRadius: 0,
yAxisID: 'y',
},
{
label: 'BTC Price (USD)',
data: prices,
borderColor: '#22d3ee',
borderWidth: 1.5,
fill: false,
tension: 0.2,
pointRadius: 0,
yAxisID: 'y1',
}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
interaction: { mode: 'index', intersect: false },
plugins: {
legend: { labels: { color: '#94a3b8', font: { size: 11 } } },
tooltip: {
callbacks: {
label: function(ctx) {
if (ctx.datasetIndex === 1) return 'BTC: $' + Math.round(ctx.parsed.y).toLocaleString();
return 'Score: ' + ctx.parsed.y;
}
}
}
},
scales: {
x: { ticks: { color: '#94a3b8', maxTicksLimit: 20, maxRotation: 45 }, grid: { color: '#1e293b' } },
y: { position: 'left', min: 0, max: 100, ticks: { color: '#f7931a' }, grid: { color: '#1e293b' },
title: { display: true, text: 'Score (0-100)', color: '#f7931a' } },
y1: { position: 'right', type: 'logarithmic', ticks: { color: '#22d3ee', callback: v => '$' + v.toLocaleString() },
grid: { drawOnChartArea: false }, title: { display: true, text: 'BTC Price (log)', color: '#22d3ee' } }
}
}
});
}
function renderBracketTable() {
const brackets = backtestData.bracket_stats;
if (!brackets) return;
const tbody = document.getElementById('bracketBody');
let html = '';
for (const b of brackets) {
const rowColor = b.days === 0 ? '' : (b.avg_365d > 50 ? 'style="background:rgba(34,197,94,0.08)"' : b.avg_365d < 0 ? 'style="background:rgba(239,68,68,0.08)"' : '');
html += '<tr ' + rowColor + '>';
html += '<td style="font-weight:700">' + b.range + '</td>';
html += '<td>' + b.label + '</td>';
html += '<td>' + b.days + '</td>';
html += '<td class="' + retClass(b.avg_30d) + '">' + fmtPct(b.avg_30d) + '</td>';
html += '<td class="' + retClass(b.avg_90d) + '">' + fmtPct(b.avg_90d) + '</td>';
html += '<td class="' + retClass(b.avg_365d) + '">' + fmtPct(b.avg_365d) + '</td>';
html += '<td>' + (b.win_rate_365d != null ? b.win_rate_365d + '%' : '--') + '</td>';
html += '<td class="t-green">' + fmtPct(b.max_gain_365d) + '</td>';
html += '<td class="t-red">' + fmtPct(b.max_loss_365d) + '</td>';
html += '<td>' + (b.avg_max_drawdown_90d != null ? b.avg_max_drawdown_90d + '%' : '--') + '</td>';
html += '</tr>';
}
tbody.innerHTML = html;
}
function renderSignalEvents() {
const events = backtestData.signal_events;
if (!events || !events.length) return;
const el = document.getElementById('signalEvents');
let html = '';
// Show most recent events first, limit to 30
const shown = events.slice(-30).reverse();
for (const ev of shown) {
const color = scoreColorCSS(ev.score);
html += '<div class="signal-card">';
html += '<div class="signal-header">';
html += '<span class="signal-date">' + ev.date + ' &mdash; ' + fmtPrice(ev.price) + '</span>';
html += '<span class="signal-score" style="background:' + color + ';color:#000">Score: ' + ev.score + ' (crossed ' + ev.threshold + ')</span>';
html += '</div>';
html += '<div class="signal-returns">';
const fr = ev.forward_returns || {};
if (fr['30d'] != null) html += '<span class="' + retClass(fr['30d']) + '">30d: ' + fmtPct(fr['30d']) + '</span>';
if (fr['90d'] != null) html += '<span class="' + retClass(fr['90d']) + '">90d: ' + fmtPct(fr['90d']) + '</span>';
if (fr['365d'] != null) html += '<span class="' + retClass(fr['365d']) + '">1yr: ' + fmtPct(fr['365d']) + '</span>';
if (ev.price_365d) html += '<span style="color:var(--text-dim)">Price 1yr: ' + fmtPrice(ev.price_365d) + '</span>';
html += '</div></div>';
}
el.innerHTML = html;
}
// Init
checkStatus();
</script>
</body>
</html>"""
@app.get("/", response_class=HTMLResponse)
def dashboard():
return DASHBOARD_HTML
@app.get("/backtest", response_class=HTMLResponse)
def backtest_page():
return BACKTEST_HTML
@app.get("/settings", response_class=HTMLResponse)
def settings_page():
return SETTINGS_HTML

View File

@ -0,0 +1,276 @@
"""Collect full historical time series from LookIntoBitcoin charts, CoinGecko, and Fear & Greed."""
import json
import logging
import os
import time
from datetime import datetime
import requests
log = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
# Charts to scrape with expected trace names
CHART_CONFIGS = {
"puell_multiple": {
"path": "/charts/puell-multiple/",
"traces": {"puell_multiple": "Puell Multiple", "btc_price": "Price"},
},
"mvrv_zscore": {
"path": "/charts/mvrv-zscore/",
"traces": {"mvrv_zscore": "Z-Score"},
},
"reserve_risk": {
"path": "/charts/reserve-risk/",
"traces": {"reserve_risk": "Reserve Risk"},
},
"rhodl_ratio": {
"path": "/charts/rhodl-ratio/",
"traces": {"rhodl_ratio": "RHODL Ratio"},
},
"nupl": {
"path": "/charts/relative-unrealized-profit--loss/",
"traces": {"nupl": "NUPL"},
},
"200w_sma": {
"path": "/charts/200-week-moving-average-heatmap/",
"traces": {"200w_sma": "200 Week Moving Average", "btc_price_sma": "Price"},
},
"lth_realized_price": {
"path": "/charts/long-term-holder-realized-price/",
"traces": {"lth_realized_price": "Long-Term Holder Realized Price", "btc_price_lth": "Price"},
},
"lth_supply": {
"path": "/charts/long-term-holder-supply/",
"traces": {"lth_supply": None}, # None = grab first numeric trace
},
}
def _find_trace(traces, name):
"""Find a trace by name (case-insensitive partial match)."""
if not traces or not name:
return None
name_lower = name.lower()
for t in traces:
trace_name = t.get("name", "").lower()
if name_lower in trace_name or trace_name in name_lower:
return t
words = name_lower.split()
for t in traces:
trace_name = t.get("name", "").lower()
if all(w in trace_name for w in words):
return t
return None
def _extract_series(trace):
"""Extract (dates, values) from a Plotly trace dict."""
if not trace:
return [], []
x = trace.get("x", [])
y = trace.get("y", [])
dates = []
values = []
for i, (d, v) in enumerate(zip(x, y)):
if v is None:
continue
try:
val = float(v)
except (ValueError, TypeError):
continue
# Normalize date string to YYYY-MM-DD
date_str = str(d)[:10]
dates.append(date_str)
values.append(val)
return dates, values
def scrape_chart_history(chart_path):
"""Scrape a chart and return all trace data."""
from scrapers.lookintobitcoin import scrape_chart
return scrape_chart(chart_path)
def collect_onchain_history(progress_cb=None):
"""Scrape all on-chain charts and return dict of {metric: {dates, values}}."""
result = {}
total = len(CHART_CONFIGS)
for idx, (chart_key, cfg) in enumerate(CHART_CONFIGS.items()):
label = f"[{idx+1}/{total}] {chart_key}"
log.info("Scraping history: %s", label)
if progress_cb:
progress_cb(chart_key, idx, total)
try:
traces = scrape_chart_history(cfg["path"])
if not traces:
log.warning("No traces for %s", chart_key)
continue
for metric_key, trace_name in cfg["traces"].items():
if trace_name is None:
# Grab first trace with numeric data
for candidate in traces:
y = candidate.get("y", [])
if y and any(v is not None for v in y[-10:]):
dates, values = _extract_series(candidate)
if dates:
result[metric_key] = {"dates": dates, "values": values}
log.info(" %s: %d data points", metric_key, len(dates))
break
else:
t = _find_trace(traces, trace_name)
if not t:
# Fallback: try BTC Price
if "btc_price" in metric_key or "price" in trace_name.lower():
t = _find_trace(traces, "BTC") or _find_trace(traces, "Price")
if not t:
log.warning(" Trace '%s' not found for %s", trace_name, metric_key)
continue
dates, values = _extract_series(t)
if dates:
result[metric_key] = {"dates": dates, "values": values}
log.info(" %s: %d data points (%s to %s)", metric_key, len(dates), dates[0], dates[-1])
else:
log.warning(" %s: no valid data points", metric_key)
except Exception as e:
log.error("Error scraping %s: %s", chart_key, e)
# Be polite between requests
if idx < total - 1:
time.sleep(2)
return result
def collect_price_history():
"""Fetch BTC price history from CoinGecko (max history)."""
log.info("Fetching BTC price history from CoinGecko...")
try:
resp = requests.get(
"https://api.coingecko.com/api/v3/coins/bitcoin/market_chart",
params={"vs_currency": "usd", "days": "max"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
prices = data.get("prices", [])
dates = []
values = []
seen_dates = set()
for ts_ms, price in prices:
d = datetime.utcfromtimestamp(ts_ms / 1000).strftime("%Y-%m-%d")
if d not in seen_dates:
seen_dates.add(d)
dates.append(d)
values.append(round(price, 2))
log.info("CoinGecko BTC price: %d days (%s to %s)", len(dates), dates[0] if dates else "?", dates[-1] if dates else "?")
return {"dates": dates, "values": values}
except Exception as e:
log.error("CoinGecko price fetch failed: %s", e)
return None
def collect_fear_greed_history():
"""Fetch full Fear & Greed history from alternative.me."""
log.info("Fetching Fear & Greed history...")
try:
resp = requests.get(
"https://api.alternative.me/fng/",
params={"limit": "0"},
timeout=30,
)
resp.raise_for_status()
data = resp.json().get("data", [])
dates = []
values = []
for entry in reversed(data): # API returns newest first
ts = int(entry["timestamp"])
d = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d")
dates.append(d)
values.append(int(entry["value"]))
log.info("Fear & Greed: %d days (%s to %s)", len(dates), dates[0] if dates else "?", dates[-1] if dates else "?")
return {"dates": dates, "values": values}
except Exception as e:
log.error("Fear & Greed fetch failed: %s", e)
return None
def collect_all_history(progress_cb=None):
"""Collect all historical data and save to history.json."""
log.info("=== Starting full historical data collection ===")
history = {}
# 1. On-chain metrics from LookIntoBitcoin
onchain = collect_onchain_history(progress_cb=progress_cb)
history.update(onchain)
# 2. BTC price from CoinGecko
price = collect_price_history()
if price:
history["btc_price_coingecko"] = price
# 3. Fear & Greed
fng = collect_fear_greed_history()
if fng:
history["fear_greed"] = fng
# Merge BTC price: prefer the LookIntoBitcoin trace (goes to 2010), fill gaps with CoinGecko
btc_keys = [k for k in history if "btc_price" in k]
if btc_keys:
# Use longest series as base
best = max(btc_keys, key=lambda k: len(history[k]["dates"]))
history["btc_price"] = history[best]
log.info("BTC price source: %s (%d days)", best, len(history[best]["dates"]))
# Add metadata
history["_metadata"] = {
"collected_at": datetime.utcnow().isoformat() + "Z",
"metrics": list(k for k in history if not k.startswith("_")),
"metric_counts": {k: len(v["dates"]) for k, v in history.items() if isinstance(v, dict) and "dates" in v},
}
# Save
os.makedirs(os.path.dirname(HISTORY_PATH), exist_ok=True)
with open(HISTORY_PATH, "w") as f:
json.dump(history, f, separators=(",", ":"))
size_mb = os.path.getsize(HISTORY_PATH) / 1024 / 1024
log.info("=== History saved to %s (%.1f MB) ===", HISTORY_PATH, size_mb)
log.info("Metrics collected: %s", ", ".join(k for k in history if not k.startswith("_")))
return history
def load_history():
"""Load history from disk."""
if not os.path.exists(HISTORY_PATH):
return None
with open(HISTORY_PATH) as f:
return json.load(f)
def history_status():
"""Check if history exists and return metadata."""
if not os.path.exists(HISTORY_PATH):
return {"exists": False}
try:
stat = os.stat(HISTORY_PATH)
with open(HISTORY_PATH) as f:
data = json.load(f)
meta = data.get("_metadata", {})
return {
"exists": True,
"collected_at": meta.get("collected_at"),
"metrics": meta.get("metrics", []),
"metric_counts": meta.get("metric_counts", {}),
"size_mb": round(stat.st_size / 1024 / 1024, 2),
}
except Exception as e:
return {"exists": True, "error": str(e)}