BizzleBot 4647c596b3 feat: ML-optimized accumulation scoring with dashboard toggle
Train GradientBoostedClassifier on 2,601 days of historical data
(2018-2025) to find optimal metric weights for identifying the best
long-term buying opportunities. Uses time-series cross-validation
to prevent look-ahead bias.

Key results:
- pct_above_200w_sma: 50.7% weight (was 11.1% equal)
- drawdown: 14.6%, lth_rp: 10.9%, rhodl: 8.9%
- fear_greed demoted from 11.1% to 5.1%
- nupl/mvrv nearly eliminated (0.7-1.8%)

ML Strong Accumulation bracket: avg +210% 1yr (vs +176% classic)

New files: ml/optimizer.py, config/ml_weights.json
Modified: scoring/engine.py (score_all_ml), backtesting/engine.py
(ml_mode), dashboard/server.py (Classic/ML toggle)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 23:18:29 +00:00

494 lines
18 KiB
Python

"""Historical backtest engine for Bitcoin Accumulation Zone scoring."""
import json
import logging
import os
import sys
from collections import defaultdict
from datetime import datetime, timedelta
log = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BASE_DIR)
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
CACHE_PATH = os.path.join(BASE_DIR, "data", "cache.json")
# Score brackets matching the dashboard assessment levels
BRACKETS = [
(0, 20, "Extreme Caution"),
(21, 40, "Caution"),
(41, 55, "Neutral"),
(56, 70, "Moderate Opportunity"),
(71, 85, "Strong Accumulation"),
(86, 100, "Extreme Accumulation"),
]
# Scoring thresholds — load from config/thresholds.json (single source of truth)
import os as _os
import json as _json
_THRESH_PATH = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))), "config", "thresholds.json")
try:
with open(_THRESH_PATH) as _f:
_THRESH = _json.load(_f)
except Exception:
_THRESH = {}
METRIC_SCORERS = {
"fear_greed": {"ranges": _THRESH.get("fear_greed", {}).get("ranges", [[0, 15, 10], [15, 30, 8], [30, 45, 5], [45, 55, 3], [55, 75, 1], [75, None, 0]])},
"puell_multiple": {"ranges": _THRESH.get("puell_multiple", {}).get("ranges", [[None, 0.4, 10], [0.4, 0.7, 8], [0.7, 1.0, 5], [1.0, 1.5, 3], [1.5, 2.0, 1], [2.0, None, 0]])},
"mvrv_zscore": {"ranges": _THRESH.get("mvrv_zscore", {}).get("ranges", [[None, 0, 10], [0, 1.0, 8], [1.0, 2.0, 5], [2.0, 3.0, 3], [3.0, 5.0, 1], [5.0, None, 0]])},
"reserve_risk": {"ranges": _THRESH.get("reserve_risk", {}).get("ranges", [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]])},
"rhodl_ratio": {"ranges": _THRESH.get("rhodl_ratio", {}).get("ranges", [[None, 200, 10], [200, 1000, 7], [1000, 5000, 4], [5000, 20000, 1], [20000, None, 0]])},
"nupl": {"ranges": _THRESH.get("nupl", {}).get("ranges", [[None, 0, 10], [0, 0.3, 8], [0.3, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]])},
}
RATIO_SCORERS = {
"price_vs_200w_sma": {
"ranges": _THRESH.get("price_vs_200w_sma", {}).get("ranges", [[None, 0, 10], [0, 30, 7], [30, 60, 5], [60, 100, 2], [100, None, 0]]),
"price_key": "btc_price",
"ref_key": "200w_sma",
},
"lth_realized_price": {
"ranges": _THRESH.get("lth_realized_price", {}).get("ranges", [[None, 0, 10], [0, 30, 7], [30, 80, 5], [80, 150, 3], [150, None, 1]]),
"price_key": "btc_price",
"ref_key": "lth_realized_price",
},
}
DRAWDOWN_RANGES = _THRESH.get("drawdown", {}).get("ranges", [[60, None, 10], [40, 60, 8], [25, 40, 6], [15, 25, 4], [5, 15, 2], [None, 5, 0]])
def _score_range(value, ranges):
"""Score a value using range-based thresholds."""
if value is None:
return None
for low, high, score in ranges:
low_ok = low is None or value >= low
high_ok = high is None or value < high
if low_ok and high_ok:
return score
return 0
def _build_daily_index(history):
"""Build a dict mapping metric_key -> {date_str: value} for fast lookup."""
index = {}
for key, data in history.items():
if key.startswith("_") or not isinstance(data, dict) or "dates" not in data:
continue
lookup = {}
for d, v in zip(data["dates"], data["values"]):
lookup[d] = v
index[key] = lookup
return index
def _get_all_dates(index):
"""Get sorted union of all dates across all metrics."""
all_dates = set()
for lookup in index.values():
all_dates.update(lookup.keys())
return sorted(all_dates)
def _last_known_value(lookup, date, max_lookback=30):
"""Get value for date, or most recent prior value within lookback window."""
if date in lookup:
return lookup[date]
d = datetime.strptime(date, "%Y-%m-%d")
for i in range(1, max_lookback + 1):
prev = (d - timedelta(days=i)).strftime("%Y-%m-%d")
if prev in lookup:
return lookup[prev]
return None
def _compute_ath_series(price_lookup, dates):
"""Compute running ATH and drawdown for each date."""
ath = 0
drawdowns = {}
for d in dates:
p = price_lookup.get(d)
if p is None:
continue
if p > ath:
ath = p
if ath > 0:
drawdowns[d] = ((ath - p) / ath) * 100
return drawdowns
def _load_ml_weights():
"""Load ML weights for ML-optimized scoring mode."""
ml_path = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))), "config", "ml_weights.json")
try:
with open(ml_path) as f:
data = _json.load(f)
return data.get("weights", {})
except Exception:
return {}
# ML weight key mapping (backtest metric keys -> ML weight keys)
_BT_ML_KEY_MAP = {
"fear_greed": "fear_greed",
"puell_multiple": "puell_multiple",
"mvrv_zscore": "mvrv_zscore",
"reserve_risk": "reserve_risk",
"rhodl_ratio": "rhodl_ratio",
"nupl": "nupl",
"price_vs_200w_sma": "pct_above_200w_sma",
"lth_realized_price": "pct_above_lth_rp",
"drawdown": "drawdown",
}
def score_day(date, index, drawdowns, ml_weights=None):
"""Score a single day using all available metrics. Returns (composite_score, individual_scores, n_metrics).
If ml_weights is provided, uses ML-optimized weighting instead of equal weights.
"""
scores = []
details = {}
# Simple range-based metrics
for metric_key, cfg in METRIC_SCORERS.items():
val = _last_known_value(index.get(metric_key, {}), date)
if val is not None:
s = _score_range(val, cfg["ranges"])
if s is not None:
scores.append(s)
details[metric_key] = {"value": val, "score": s}
# Ratio-based metrics (price vs reference)
for metric_key, cfg in RATIO_SCORERS.items():
price_val = _last_known_value(index.get(cfg["price_key"], {}), date)
# Try alternate price keys
if price_val is None:
for pk in ["btc_price_coingecko", "btc_price_sma", "btc_price_lth"]:
price_val = _last_known_value(index.get(pk, {}), date)
if price_val is not None:
break
ref_val = _last_known_value(index.get(cfg["ref_key"], {}), date)
if price_val is not None and ref_val is not None and ref_val > 0:
pct_above = ((price_val - ref_val) / ref_val) * 100
s = _score_range(pct_above, cfg["ranges"])
if s is not None:
scores.append(s)
details[metric_key] = {"value": pct_above, "score": s}
# Drawdown
dd = drawdowns.get(date)
if dd is not None:
s = _score_range(dd, DRAWDOWN_RANGES)
if s is not None:
scores.append(s)
details["drawdown"] = {"value": dd, "score": s}
if not scores:
return None, details, 0
if ml_weights:
# ML-weighted composite
weighted_sum = 0.0
weight_total = 0.0
for metric_key, info in details.items():
ml_key = _BT_ML_KEY_MAP.get(metric_key, metric_key)
w = ml_weights.get(ml_key, 0.0)
weighted_sum += info["score"] * w
weight_total += w
if weight_total > 0:
composite = weighted_sum / weight_total * 10
else:
composite = sum(scores) / len(scores) * 10
else:
composite = sum(scores) / len(scores) * 10
return round(composite, 1), details, len(scores)
def compute_forward_returns(price_lookup, dates_sorted):
"""Precompute forward returns for all dates."""
periods = [30, 90, 180, 365]
returns = {}
for d in dates_sorted:
p0 = price_lookup.get(d)
if p0 is None or p0 <= 0:
continue
r = {}
dt = datetime.strptime(d, "%Y-%m-%d")
for days in periods:
future = (dt + timedelta(days=days)).strftime("%Y-%m-%d")
pf = price_lookup.get(future)
if pf is not None:
r[f"{days}d"] = round(((pf - p0) / p0) * 100, 2)
if r:
returns[d] = r
return returns
def compute_max_drawdown_forward(price_lookup, date, window=90):
"""Compute max drawdown within N days after a given date."""
dt = datetime.strptime(date, "%Y-%m-%d")
p0 = price_lookup.get(date)
if p0 is None or p0 <= 0:
return None
peak = p0
max_dd = 0
for i in range(1, window + 1):
future = (dt + timedelta(days=i)).strftime("%Y-%m-%d")
pf = price_lookup.get(future)
if pf is None:
continue
if pf > peak:
peak = pf
dd = ((peak - pf) / peak) * 100
if dd > max_dd:
max_dd = dd
return round(max_dd, 2) if max_dd > 0 else 0
def run_backtest(ml_mode=False):
"""Run the full backtest and return comprehensive results.
If ml_mode=True, uses ML-optimized metric weights instead of equal weights.
"""
log.info("Loading historical data... (ml_mode=%s)", ml_mode)
if not os.path.exists(HISTORY_PATH):
return {"error": "No historical data found. Run history collector first."}
with open(HISTORY_PATH) as f:
history = json.load(f)
index = _build_daily_index(history)
# Build price lookup (prefer coingecko for completeness)
price_lookup = {}
for pk in ["btc_price_coingecko", "btc_price", "btc_price_sma", "btc_price_lth"]:
if pk in index:
for d, v in index[pk].items():
if d not in price_lookup:
price_lookup[d] = v
all_dates = _get_all_dates(index)
if not all_dates:
return {"error": "No date data available."}
log.info("Date range: %s to %s (%d days)", all_dates[0], all_dates[-1], len(all_dates))
# Compute drawdowns
drawdowns = _compute_ath_series(price_lookup, all_dates)
# Precompute forward returns
log.info("Computing forward returns...")
fwd_returns = compute_forward_returns(price_lookup, all_dates)
# Load ML weights if in ML mode
ml_weights = _load_ml_weights() if ml_mode else None
if ml_mode and not ml_weights:
log.warning("ML mode requested but no weights found — falling back to equal weights")
ml_weights = None
# Score each day
log.info("Scoring %d days...", len(all_dates))
daily_scores = []
for d in all_dates:
composite, details, n_metrics = score_day(d, index, drawdowns, ml_weights=ml_weights)
if composite is not None and n_metrics >= 3: # Require at least 3 metrics
price = price_lookup.get(d)
entry = {
"date": d,
"score": composite,
"n_metrics": n_metrics,
"price": price,
"forward_returns": fwd_returns.get(d, {}),
}
daily_scores.append(entry)
if not daily_scores:
return {"error": "No scored days (insufficient metric overlap)."}
log.info("Scored %d days with 3+ metrics", len(daily_scores))
# --- Bracket statistics ---
bracket_stats = []
for low, high, label in BRACKETS:
days_in = [d for d in daily_scores if low <= d["score"] <= high]
if not days_in:
bracket_stats.append({
"range": f"{low}-{high}", "label": label, "days": 0,
})
continue
stats = {"range": f"{low}-{high}", "label": label, "days": len(days_in)}
for period in ["30d", "90d", "180d", "365d"]:
returns = [d["forward_returns"][period] for d in days_in if period in d["forward_returns"]]
if returns:
returns_sorted = sorted(returns)
stats[f"avg_{period}"] = round(sum(returns) / len(returns), 2)
stats[f"median_{period}"] = round(returns_sorted[len(returns_sorted) // 2], 2)
stats[f"win_rate_{period}"] = round(len([r for r in returns if r > 0]) / len(returns) * 100, 1)
stats[f"max_gain_{period}"] = round(max(returns), 2)
stats[f"max_loss_{period}"] = round(min(returns), 2)
stats[f"n_{period}"] = len(returns)
# Average max drawdown within 90 days
dd_list = []
for d in days_in:
dd = compute_max_drawdown_forward(price_lookup, d["date"], 90)
if dd is not None:
dd_list.append(dd)
if dd_list:
stats["avg_max_drawdown_90d"] = round(sum(dd_list) / len(dd_list), 2)
bracket_stats.append(stats)
# --- Peak signal events ---
signal_events = []
thresholds = [90, 80, 70]
for thresh in thresholds:
prev_score = 0
for d in daily_scores:
if d["score"] >= thresh and prev_score < thresh:
event = {
"date": d["date"],
"score": d["score"],
"threshold": thresh,
"price": d["price"],
"forward_returns": d["forward_returns"],
}
# Add future prices
if d["price"]:
dt = datetime.strptime(d["date"], "%Y-%m-%d")
for days_ahead in [30, 90, 365]:
future = (dt + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
fp = price_lookup.get(future)
if fp:
event[f"price_{days_ahead}d"] = round(fp, 2)
signal_events.append(event)
prev_score = d["score"]
signal_events.sort(key=lambda e: e["date"])
# --- Current signal context ---
all_scores_list = [d["score"] for d in daily_scores]
all_scores_list.sort()
# Get current score from cache
current_score = None
current_price = None
if os.path.exists(CACHE_PATH):
try:
with open(CACHE_PATH) as f:
cache = json.load(f)
scored = cache.get("_scored", {})
current_score = scored.get("composite_score")
current_price = cache.get("price", {}).get("price")
except Exception:
pass
# If no cache, use latest daily score
if current_score is None and daily_scores:
current_score = daily_scores[-1]["score"]
current_price = daily_scores[-1].get("price")
current_context = None
if current_score is not None:
# Percentile
below = len([s for s in all_scores_list if s <= current_score])
percentile = round(below / len(all_scores_list) * 100, 1)
# Find comparable historical periods
comparable = []
margin = 5
for d in daily_scores:
if abs(d["score"] - current_score) <= margin and d["forward_returns"]:
comparable.append(d)
avg_returns = {}
if comparable:
for period in ["30d", "90d", "180d", "365d"]:
vals = [d["forward_returns"][period] for d in comparable if period in d["forward_returns"]]
if vals:
avg_returns[period] = round(sum(vals) / len(vals), 2)
avg_1yr = avg_returns.get("365d")
# Best comparable examples — one per market cycle for diversity
# Cycles: pre-2016, 2016-2017 bull, 2018-2019 bear, 2020-2021 bull, 2022-2023 bear, 2024+
cycle_bins = [
("pre-2016", "2010-01-01", "2015-12-31"),
("2016-17 Bull", "2016-01-01", "2017-12-31"),
("2018-19 Bear", "2018-01-01", "2019-12-31"),
("2020-21 Bull", "2020-01-01", "2021-12-31"),
("2022-23 Bear", "2022-01-01", "2023-12-31"),
("2024+", "2024-01-01", "2099-12-31"),
]
examples = []
used_cycles = set()
# Sort comparable by closest score first, then pick one per cycle
sorted_comp = sorted(comparable, key=lambda d: abs(d["score"] - current_score))
for d in sorted_comp:
cycle_label = None
for label, start, end in cycle_bins:
if start <= d["date"] <= end:
cycle_label = label
break
if cycle_label and cycle_label not in used_cycles:
used_cycles.add(cycle_label)
examples.append({
"date": d["date"],
"score": d["score"],
"price": d["price"],
"forward_returns": d["forward_returns"],
"cycle": cycle_label,
})
if len(examples) >= 6:
break
# Sort examples chronologically
examples.sort(key=lambda d: d["date"])
current_context = {
"current_score": current_score,
"current_price": current_price,
"percentile": percentile,
"comparable_days": len(comparable),
"avg_1yr_return": avg_1yr,
"avg_30d_return": avg_returns.get("30d"),
"avg_90d_return": avg_returns.get("90d"),
"avg_180d_return": avg_returns.get("180d"),
"examples": examples,
}
# --- Build time series for charting ---
# Smart downsampling: daily for last 2 years, weekly before that
chart_data = []
import datetime as _dt
try:
last_date = _dt.datetime.strptime(daily_scores[-1]["date"], "%Y-%m-%d")
cutoff_date = (last_date - _dt.timedelta(days=730)).strftime("%Y-%m-%d")
except Exception:
cutoff_date = "2024-01-01"
for i, d in enumerate(daily_scores):
is_recent = d["date"] >= cutoff_date
if is_recent or i % 7 == 0 or i == len(daily_scores) - 1:
chart_data.append({
"date": d["date"],
"score": d["score"],
"price": d["price"],
})
result = {
"date_range": {"start": daily_scores[0]["date"], "end": daily_scores[-1]["date"]},
"total_days_scored": len(daily_scores),
"bracket_stats": bracket_stats,
"signal_events": signal_events,
"current_context": current_context,
"chart_data": chart_data,
"ml_mode": ml_mode,
"computed_at": datetime.utcnow().isoformat() + "Z",
}
log.info("Backtest complete: %d days, %d signal events", len(daily_scores), len(signal_events))
return result