- scrapers/history_collector.py: scrapes full time series from 8 LookIntoBitcoin charts + Fear & Greed API, stores to data/history.json (~5700 days back to 2010) - backtesting/engine.py: scores each historical day using same thresholds as live scoring, computes 30d/90d/180d/1yr forward returns, bracket stats, signal events - dashboard/server.py: adds /backtest page with dual-axis score vs price chart, bracket performance table, signal event list, current context box; adds backtest nav link and historical context box on main dashboard; 4 new API endpoints Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
277 lines
9.5 KiB
Python
277 lines
9.5 KiB
Python
"""Collect full historical time series from LookIntoBitcoin charts, CoinGecko, and Fear & Greed."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
|
|
|
|
# Charts to scrape with expected trace names
|
|
CHART_CONFIGS = {
|
|
"puell_multiple": {
|
|
"path": "/charts/puell-multiple/",
|
|
"traces": {"puell_multiple": "Puell Multiple", "btc_price": "Price"},
|
|
},
|
|
"mvrv_zscore": {
|
|
"path": "/charts/mvrv-zscore/",
|
|
"traces": {"mvrv_zscore": "Z-Score"},
|
|
},
|
|
"reserve_risk": {
|
|
"path": "/charts/reserve-risk/",
|
|
"traces": {"reserve_risk": "Reserve Risk"},
|
|
},
|
|
"rhodl_ratio": {
|
|
"path": "/charts/rhodl-ratio/",
|
|
"traces": {"rhodl_ratio": "RHODL Ratio"},
|
|
},
|
|
"nupl": {
|
|
"path": "/charts/relative-unrealized-profit--loss/",
|
|
"traces": {"nupl": "NUPL"},
|
|
},
|
|
"200w_sma": {
|
|
"path": "/charts/200-week-moving-average-heatmap/",
|
|
"traces": {"200w_sma": "200 Week Moving Average", "btc_price_sma": "Price"},
|
|
},
|
|
"lth_realized_price": {
|
|
"path": "/charts/long-term-holder-realized-price/",
|
|
"traces": {"lth_realized_price": "Long-Term Holder Realized Price", "btc_price_lth": "Price"},
|
|
},
|
|
"lth_supply": {
|
|
"path": "/charts/long-term-holder-supply/",
|
|
"traces": {"lth_supply": None}, # None = grab first numeric trace
|
|
},
|
|
}
|
|
|
|
|
|
def _find_trace(traces, name):
|
|
"""Find a trace by name (case-insensitive partial match)."""
|
|
if not traces or not name:
|
|
return None
|
|
name_lower = name.lower()
|
|
for t in traces:
|
|
trace_name = t.get("name", "").lower()
|
|
if name_lower in trace_name or trace_name in name_lower:
|
|
return t
|
|
words = name_lower.split()
|
|
for t in traces:
|
|
trace_name = t.get("name", "").lower()
|
|
if all(w in trace_name for w in words):
|
|
return t
|
|
return None
|
|
|
|
|
|
def _extract_series(trace):
|
|
"""Extract (dates, values) from a Plotly trace dict."""
|
|
if not trace:
|
|
return [], []
|
|
x = trace.get("x", [])
|
|
y = trace.get("y", [])
|
|
dates = []
|
|
values = []
|
|
for i, (d, v) in enumerate(zip(x, y)):
|
|
if v is None:
|
|
continue
|
|
try:
|
|
val = float(v)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
# Normalize date string to YYYY-MM-DD
|
|
date_str = str(d)[:10]
|
|
dates.append(date_str)
|
|
values.append(val)
|
|
return dates, values
|
|
|
|
|
|
def scrape_chart_history(chart_path):
|
|
"""Scrape a chart and return all trace data."""
|
|
from scrapers.lookintobitcoin import scrape_chart
|
|
return scrape_chart(chart_path)
|
|
|
|
|
|
def collect_onchain_history(progress_cb=None):
|
|
"""Scrape all on-chain charts and return dict of {metric: {dates, values}}."""
|
|
result = {}
|
|
total = len(CHART_CONFIGS)
|
|
|
|
for idx, (chart_key, cfg) in enumerate(CHART_CONFIGS.items()):
|
|
label = f"[{idx+1}/{total}] {chart_key}"
|
|
log.info("Scraping history: %s", label)
|
|
if progress_cb:
|
|
progress_cb(chart_key, idx, total)
|
|
|
|
try:
|
|
traces = scrape_chart_history(cfg["path"])
|
|
if not traces:
|
|
log.warning("No traces for %s", chart_key)
|
|
continue
|
|
|
|
for metric_key, trace_name in cfg["traces"].items():
|
|
if trace_name is None:
|
|
# Grab first trace with numeric data
|
|
for candidate in traces:
|
|
y = candidate.get("y", [])
|
|
if y and any(v is not None for v in y[-10:]):
|
|
dates, values = _extract_series(candidate)
|
|
if dates:
|
|
result[metric_key] = {"dates": dates, "values": values}
|
|
log.info(" %s: %d data points", metric_key, len(dates))
|
|
break
|
|
else:
|
|
t = _find_trace(traces, trace_name)
|
|
if not t:
|
|
# Fallback: try BTC Price
|
|
if "btc_price" in metric_key or "price" in trace_name.lower():
|
|
t = _find_trace(traces, "BTC") or _find_trace(traces, "Price")
|
|
if not t:
|
|
log.warning(" Trace '%s' not found for %s", trace_name, metric_key)
|
|
continue
|
|
dates, values = _extract_series(t)
|
|
if dates:
|
|
result[metric_key] = {"dates": dates, "values": values}
|
|
log.info(" %s: %d data points (%s to %s)", metric_key, len(dates), dates[0], dates[-1])
|
|
else:
|
|
log.warning(" %s: no valid data points", metric_key)
|
|
|
|
except Exception as e:
|
|
log.error("Error scraping %s: %s", chart_key, e)
|
|
|
|
# Be polite between requests
|
|
if idx < total - 1:
|
|
time.sleep(2)
|
|
|
|
return result
|
|
|
|
|
|
def collect_price_history():
|
|
"""Fetch BTC price history from CoinGecko (max history)."""
|
|
log.info("Fetching BTC price history from CoinGecko...")
|
|
try:
|
|
resp = requests.get(
|
|
"https://api.coingecko.com/api/v3/coins/bitcoin/market_chart",
|
|
params={"vs_currency": "usd", "days": "max"},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
prices = data.get("prices", [])
|
|
dates = []
|
|
values = []
|
|
seen_dates = set()
|
|
for ts_ms, price in prices:
|
|
d = datetime.utcfromtimestamp(ts_ms / 1000).strftime("%Y-%m-%d")
|
|
if d not in seen_dates:
|
|
seen_dates.add(d)
|
|
dates.append(d)
|
|
values.append(round(price, 2))
|
|
log.info("CoinGecko BTC price: %d days (%s to %s)", len(dates), dates[0] if dates else "?", dates[-1] if dates else "?")
|
|
return {"dates": dates, "values": values}
|
|
except Exception as e:
|
|
log.error("CoinGecko price fetch failed: %s", e)
|
|
return None
|
|
|
|
|
|
def collect_fear_greed_history():
|
|
"""Fetch full Fear & Greed history from alternative.me."""
|
|
log.info("Fetching Fear & Greed history...")
|
|
try:
|
|
resp = requests.get(
|
|
"https://api.alternative.me/fng/",
|
|
params={"limit": "0"},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json().get("data", [])
|
|
dates = []
|
|
values = []
|
|
for entry in reversed(data): # API returns newest first
|
|
ts = int(entry["timestamp"])
|
|
d = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d")
|
|
dates.append(d)
|
|
values.append(int(entry["value"]))
|
|
log.info("Fear & Greed: %d days (%s to %s)", len(dates), dates[0] if dates else "?", dates[-1] if dates else "?")
|
|
return {"dates": dates, "values": values}
|
|
except Exception as e:
|
|
log.error("Fear & Greed fetch failed: %s", e)
|
|
return None
|
|
|
|
|
|
def collect_all_history(progress_cb=None):
|
|
"""Collect all historical data and save to history.json."""
|
|
log.info("=== Starting full historical data collection ===")
|
|
history = {}
|
|
|
|
# 1. On-chain metrics from LookIntoBitcoin
|
|
onchain = collect_onchain_history(progress_cb=progress_cb)
|
|
history.update(onchain)
|
|
|
|
# 2. BTC price from CoinGecko
|
|
price = collect_price_history()
|
|
if price:
|
|
history["btc_price_coingecko"] = price
|
|
|
|
# 3. Fear & Greed
|
|
fng = collect_fear_greed_history()
|
|
if fng:
|
|
history["fear_greed"] = fng
|
|
|
|
# Merge BTC price: prefer the LookIntoBitcoin trace (goes to 2010), fill gaps with CoinGecko
|
|
btc_keys = [k for k in history if "btc_price" in k]
|
|
if btc_keys:
|
|
# Use longest series as base
|
|
best = max(btc_keys, key=lambda k: len(history[k]["dates"]))
|
|
history["btc_price"] = history[best]
|
|
log.info("BTC price source: %s (%d days)", best, len(history[best]["dates"]))
|
|
|
|
# Add metadata
|
|
history["_metadata"] = {
|
|
"collected_at": datetime.utcnow().isoformat() + "Z",
|
|
"metrics": list(k for k in history if not k.startswith("_")),
|
|
"metric_counts": {k: len(v["dates"]) for k, v in history.items() if isinstance(v, dict) and "dates" in v},
|
|
}
|
|
|
|
# Save
|
|
os.makedirs(os.path.dirname(HISTORY_PATH), exist_ok=True)
|
|
with open(HISTORY_PATH, "w") as f:
|
|
json.dump(history, f, separators=(",", ":"))
|
|
|
|
size_mb = os.path.getsize(HISTORY_PATH) / 1024 / 1024
|
|
log.info("=== History saved to %s (%.1f MB) ===", HISTORY_PATH, size_mb)
|
|
log.info("Metrics collected: %s", ", ".join(k for k in history if not k.startswith("_")))
|
|
|
|
return history
|
|
|
|
|
|
def load_history():
|
|
"""Load history from disk."""
|
|
if not os.path.exists(HISTORY_PATH):
|
|
return None
|
|
with open(HISTORY_PATH) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def history_status():
|
|
"""Check if history exists and return metadata."""
|
|
if not os.path.exists(HISTORY_PATH):
|
|
return {"exists": False}
|
|
try:
|
|
stat = os.stat(HISTORY_PATH)
|
|
with open(HISTORY_PATH) as f:
|
|
data = json.load(f)
|
|
meta = data.get("_metadata", {})
|
|
return {
|
|
"exists": True,
|
|
"collected_at": meta.get("collected_at"),
|
|
"metrics": meta.get("metrics", []),
|
|
"metric_counts": meta.get("metric_counts", {}),
|
|
"size_mb": round(stat.st_size / 1024 / 1024, 2),
|
|
}
|
|
except Exception as e:
|
|
return {"exists": True, "error": str(e)}
|