BizzleBot 4647c596b3 feat: ML-optimized accumulation scoring with dashboard toggle
Train GradientBoostedClassifier on 2,601 days of historical data
(2018-2025) to find optimal metric weights for identifying the best
long-term buying opportunities. Uses time-series cross-validation
to prevent look-ahead bias.

Key results:
- pct_above_200w_sma: 50.7% weight (was 11.1% equal)
- drawdown: 14.6%, lth_rp: 10.9%, rhodl: 8.9%
- fear_greed demoted from 11.1% to 5.1%
- nupl/mvrv nearly eliminated (0.7-1.8%)

ML Strong Accumulation bracket: avg +210% 1yr (vs +176% classic)

New files: ml/optimizer.py, config/ml_weights.json
Modified: scoring/engine.py (score_all_ml), backtesting/engine.py
(ml_mode), dashboard/server.py (Classic/ML toggle)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 23:18:29 +00:00

563 lines
20 KiB
Python

#!/usr/bin/env python3
"""
ML Optimizer for Bitcoin Accumulation Zone Scoring.
Trains a gradient boosted tree model on historical on-chain metrics to find
optimal metric weights for identifying the best long-term buying opportunities.
Output: config/ml_weights.json with optimized weights and feature importances.
"""
import json
import logging
import os
import sys
from datetime import datetime, timedelta
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
classification_report,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
log = logging.getLogger("ml-optimizer")
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
OUTPUT_PATH = os.path.join(BASE_DIR, "config", "ml_weights.json")
THRESHOLDS_PATH = os.path.join(BASE_DIR, "config", "thresholds.json")
# Date range: 2018-02-01 onward (when all 8 metrics + fear_greed available)
START_DATE = "2018-02-01"
# Training cutoff: need 1yr forward data for labels
TRAIN_CUTOFF_DAYS = 365
# Target: forward 365d return > 30% = "good time to buy"
GOOD_BUY_THRESHOLD = 30.0
# The 8 core metrics we score
METRIC_KEYS = [
"puell_multiple",
"mvrv_zscore",
"reserve_risk",
"rhodl_ratio",
"nupl",
"fear_greed",
]
# Ratio-based metrics (derived from price vs reference)
RATIO_METRICS = {
"pct_above_200w_sma": {"price_key": "btc_price", "ref_key": "200w_sma"},
"pct_above_lth_rp": {"price_key": "btc_price", "ref_key": "lth_realized_price"},
}
def load_history():
"""Load historical data and build date-aligned lookup."""
with open(HISTORY_PATH) as f:
raw = json.load(f)
index = {}
for key, data in raw.items():
if not isinstance(data, dict) or "dates" not in data:
continue
lookup = {}
for d, v in zip(data["dates"], data["values"]):
if v is not None:
lookup[d] = v
index[key] = lookup
return index
def load_thresholds():
"""Load scoring thresholds for converting raw values to 0-10 scores."""
with open(THRESHOLDS_PATH) as f:
return json.load(f)
def score_range(value, ranges):
"""Score a value using range-based thresholds (same logic as scoring/engine.py)."""
if value is None:
return None
for low, high, score in ranges:
low_ok = low is None or value >= low
high_ok = high is None or value < high
if low_ok and high_ok:
return score
return 0
def build_dataset(index, thresholds):
"""Build aligned training dataset: metric scores + forward returns."""
# Get all dates from 2018-02-01 onward
all_dates = set()
for lookup in index.values():
all_dates.update(lookup.keys())
dates = sorted(d for d in all_dates if d >= START_DATE)
# Build price lookup for forward returns
price_lookup = {}
for pk in ["btc_price", "btc_price_sma", "btc_price_lth"]:
if pk in index:
for d, v in index[pk].items():
if d not in price_lookup:
price_lookup[d] = v
# Compute ATH series for drawdown
all_dates_sorted = sorted(all_dates)
ath = 0
drawdowns = {}
for d in all_dates_sorted:
p = price_lookup.get(d)
if p is None:
continue
if p > ath:
ath = p
if ath > 0:
drawdowns[d] = ((ath - p) / ath) * 100
# Get threshold ranges for scoring raw values
metric_ranges = {
"puell_multiple": thresholds.get("puell_multiple", {}).get("ranges", []),
"mvrv_zscore": thresholds.get("mvrv_zscore", {}).get("ranges", []),
"reserve_risk": thresholds.get("reserve_risk", {}).get("ranges", []),
"rhodl_ratio": thresholds.get("rhodl_ratio", {}).get("ranges", []),
"nupl": thresholds.get("nupl", {}).get("ranges", []),
"fear_greed": thresholds.get("fear_greed", {}).get("ranges", []),
"drawdown": thresholds.get("drawdown", {}).get("ranges", []),
"price_vs_200w_sma": thresholds.get("price_vs_200w_sma", {}).get("ranges", []),
"lth_realized_price": thresholds.get("lth_realized_price", {}).get("ranges", []),
}
log.info("Building dataset from %d dates (%s to %s)", len(dates), dates[0], dates[-1])
rows = []
for d in dates:
# Get raw metric values
vals = {}
skip = False
for key in METRIC_KEYS:
v = index.get(key, {}).get(d)
if v is None:
skip = True
break
vals[key] = v
if skip:
continue
# Compute ratio metrics
price = price_lookup.get(d)
sma_200w = index.get("200w_sma", {}).get(d)
lth_rp = index.get("lth_realized_price", {}).get(d)
if price is None or sma_200w is None or lth_rp is None:
continue
if sma_200w == 0 or lth_rp == 0:
continue
pct_200w = ((price - sma_200w) / sma_200w) * 100
pct_lth = ((price - lth_rp) / lth_rp) * 100
dd = drawdowns.get(d, 0)
vals["pct_above_200w_sma"] = pct_200w
vals["pct_above_lth_rp"] = pct_lth
vals["drawdown"] = dd
# Score each metric (0-10) using existing thresholds
scores = {}
scores["puell_multiple"] = score_range(vals["puell_multiple"], metric_ranges["puell_multiple"])
scores["mvrv_zscore"] = score_range(vals["mvrv_zscore"], metric_ranges["mvrv_zscore"])
scores["reserve_risk"] = score_range(vals["reserve_risk"], metric_ranges["reserve_risk"])
scores["rhodl_ratio"] = score_range(vals["rhodl_ratio"], metric_ranges["rhodl_ratio"])
scores["nupl"] = score_range(vals["nupl"], metric_ranges["nupl"])
scores["fear_greed"] = score_range(vals["fear_greed"], metric_ranges["fear_greed"])
scores["drawdown"] = score_range(dd, metric_ranges["drawdown"])
scores["pct_above_200w_sma"] = score_range(pct_200w, metric_ranges["price_vs_200w_sma"])
scores["pct_above_lth_rp"] = score_range(pct_lth, metric_ranges["lth_realized_price"])
if any(s is None for s in scores.values()):
continue
# Forward returns
dt = datetime.strptime(d, "%Y-%m-%d")
fwd = {}
for days in [30, 90, 180, 365]:
future_d = (dt + timedelta(days=days)).strftime("%Y-%m-%d")
fp = price_lookup.get(future_d)
if fp is not None and price > 0:
fwd[f"fwd_{days}d"] = ((fp - price) / price) * 100
# Compute rate-of-change features (30d deltas)
deltas = {}
d_30ago = (dt - timedelta(days=30)).strftime("%Y-%m-%d")
for key in ["mvrv_zscore", "nupl", "puell_multiple", "reserve_risk"]:
v_now = vals[key]
v_prev = index.get(key, {}).get(d_30ago)
if v_prev is not None and v_prev != 0:
deltas[f"delta_30d_{key}"] = v_now - v_prev
else:
deltas[f"delta_30d_{key}"] = 0.0
# Interaction terms
interactions = {
"mvrv_x_nupl": vals["mvrv_zscore"] * vals["nupl"],
"puell_x_reserve": vals["puell_multiple"] * vals["reserve_risk"],
}
# Days since last ATH
days_since_ath = 0
for i in range(1, 2000):
check_d = (dt - timedelta(days=i)).strftime("%Y-%m-%d")
check_dd = drawdowns.get(check_d, 100)
if check_dd < 0.1: # essentially at ATH
days_since_ath = i
break
else:
days_since_ath = 2000
row = {
"date": d,
"price": price,
**{f"score_{k}": v for k, v in scores.items()},
**{f"raw_{k}": v for k, v in vals.items()},
**deltas,
**interactions,
"days_since_ath": days_since_ath,
**fwd,
}
rows.append(row)
log.info("Built %d complete data rows", len(rows))
return rows
def train_model(rows):
"""Train gradient boosted classifier to identify good buying opportunities."""
# Filter to rows that have 365d forward return (for labeling)
labeled = [r for r in rows if "fwd_365d" in r]
log.info("Rows with 365d forward data: %d", len(labeled))
if len(labeled) < 100:
log.error("Not enough labeled data. Need at least 100 rows, got %d", len(labeled))
return None
# Create binary target: forward 365d return > threshold
for r in labeled:
r["target"] = 1 if r["fwd_365d"] > GOOD_BUY_THRESHOLD else 0
positive = sum(r["target"] for r in labeled)
log.info("Target distribution: %d positive (%.1f%%), %d negative",
positive, positive / len(labeled) * 100, len(labeled) - positive)
# Feature columns: scores + raw values + deltas + interactions + cycle position
score_features = [
"score_puell_multiple", "score_mvrv_zscore", "score_reserve_risk",
"score_rhodl_ratio", "score_nupl", "score_fear_greed",
"score_drawdown", "score_pct_above_200w_sma", "score_pct_above_lth_rp",
]
raw_features = [
"raw_puell_multiple", "raw_mvrv_zscore", "raw_reserve_risk",
"raw_rhodl_ratio", "raw_nupl", "raw_fear_greed",
"raw_pct_above_200w_sma", "raw_pct_above_lth_rp", "raw_drawdown",
]
delta_features = [
"delta_30d_mvrv_zscore", "delta_30d_nupl",
"delta_30d_puell_multiple", "delta_30d_reserve_risk",
]
interaction_features = ["mvrv_x_nupl", "puell_x_reserve"]
cycle_features = ["days_since_ath"]
feature_cols = score_features + raw_features + delta_features + interaction_features + cycle_features
X = np.array([[r[f] for f in feature_cols] for r in labeled])
y = np.array([r["target"] for r in labeled])
log.info("Feature matrix: %d samples x %d features", X.shape[0], X.shape[1])
# Time-series cross-validation (expanding window, 5 splits)
tscv = TimeSeriesSplit(n_splits=5)
cv_scores = []
cv_f1 = []
cv_precision = []
cv_recall = []
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_val_s = scaler.transform(X_val)
model = GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=4,
subsample=0.8,
min_samples_leaf=20,
random_state=42,
)
model.fit(X_train_s, y_train)
y_pred = model.predict(X_val_s)
y_prob = model.predict_proba(X_val_s)[:, 1]
auc = roc_auc_score(y_val, y_prob) if len(np.unique(y_val)) > 1 else 0
f1 = f1_score(y_val, y_pred, zero_division=0)
prec = precision_score(y_val, y_pred, zero_division=0)
rec = recall_score(y_val, y_pred, zero_division=0)
cv_scores.append(auc)
cv_f1.append(f1)
cv_precision.append(prec)
cv_recall.append(rec)
train_dates = f"{labeled[train_idx[0]]['date']} to {labeled[train_idx[-1]]['date']}"
val_dates = f"{labeled[val_idx[0]]['date']} to {labeled[val_idx[-1]]['date']}"
log.info("Fold %d: Train %s | Val %s | AUC=%.3f F1=%.3f P=%.3f R=%.3f",
fold + 1, train_dates, val_dates, auc, f1, prec, rec)
log.info("CV Mean AUC: %.3f (+/- %.3f)", np.mean(cv_scores), np.std(cv_scores))
log.info("CV Mean F1: %.3f (+/- %.3f)", np.mean(cv_f1), np.std(cv_f1))
# Train final model on all labeled data
log.info("Training final model on all %d labeled samples...", len(labeled))
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
final_model = GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=4,
subsample=0.8,
min_samples_leaf=20,
random_state=42,
)
final_model.fit(X_scaled, y)
# Feature importances
importances = final_model.feature_importances_
feat_imp = sorted(
zip(feature_cols, importances),
key=lambda x: x[1],
reverse=True,
)
log.info("\nFeature Importance Ranking:")
log.info("-" * 50)
for name, imp in feat_imp:
bar = "#" * int(imp * 200)
log.info(" %-30s %.4f %s", name, imp, bar)
# Extract optimal weights by aggregating importance per metric
# Map each feature back to its parent metric
metric_names = [
"puell_multiple", "mvrv_zscore", "reserve_risk", "rhodl_ratio",
"nupl", "fear_greed", "drawdown", "pct_above_200w_sma", "pct_above_lth_rp",
]
feature_to_metric = {}
for m in metric_names:
feature_to_metric[f"score_{m}"] = m
feature_to_metric[f"raw_{m}"] = m
# Delta features map to their base metric
feature_to_metric["delta_30d_mvrv_zscore"] = "mvrv_zscore"
feature_to_metric["delta_30d_nupl"] = "nupl"
feature_to_metric["delta_30d_puell_multiple"] = "puell_multiple"
feature_to_metric["delta_30d_reserve_risk"] = "reserve_risk"
# Interaction terms split evenly between constituent metrics
# mvrv_x_nupl -> mvrv_zscore + nupl
# puell_x_reserve -> puell_multiple + reserve_risk
metric_importances = {m: 0.0 for m in metric_names}
for name, imp in feat_imp:
if name in feature_to_metric:
metric_importances[feature_to_metric[name]] += imp
elif name == "mvrv_x_nupl":
metric_importances["mvrv_zscore"] += imp / 2
metric_importances["nupl"] += imp / 2
elif name == "puell_x_reserve":
metric_importances["puell_multiple"] += imp / 2
metric_importances["reserve_risk"] += imp / 2
# days_since_ath maps to drawdown conceptually
elif name == "days_since_ath":
metric_importances["drawdown"] += imp
# Normalize weights to sum to 1
total_imp = sum(metric_importances.values())
if total_imp > 0:
weights = {k: round(v / total_imp, 4) for k, v in metric_importances.items()}
else:
weights = {k: round(1 / len(metric_importances), 4) for k in metric_importances}
# Sort by weight descending
weights = dict(sorted(weights.items(), key=lambda x: x[1], reverse=True))
log.info("\nOptimal Metric Weights:")
log.info("-" * 50)
equal_weight = round(1 / len(weights), 4)
for metric, w in weights.items():
change = "+" if w > equal_weight else ""
diff = (w - equal_weight) / equal_weight * 100
log.info(" %-25s %.4f (%s%.0f%% vs equal)", metric, w, change, diff)
# Run comparison backtest: ML-weighted vs equal-weight
log.info("\n" + "=" * 60)
log.info("COMPARISON BACKTEST: ML-Weighted vs Equal-Weight")
log.info("=" * 60)
comparison = run_comparison(rows, weights)
# Build output
result = {
"weights": weights,
"feature_importances": {name: round(float(imp), 6) for name, imp in feat_imp},
"cv_results": {
"mean_auc": round(float(np.mean(cv_scores)), 4),
"std_auc": round(float(np.std(cv_scores)), 4),
"mean_f1": round(float(np.mean(cv_f1)), 4),
"mean_precision": round(float(np.mean(cv_precision)), 4),
"mean_recall": round(float(np.mean(cv_recall)), 4),
},
"training_info": {
"n_samples": len(labeled),
"n_positive": int(positive),
"positive_rate": round(positive / len(labeled), 4),
"n_features": len(feature_cols),
"target_threshold": GOOD_BUY_THRESHOLD,
"date_range": f"{labeled[0]['date']} to {labeled[-1]['date']}",
"model": "GradientBoostingClassifier",
},
"comparison": comparison,
"trained_at": datetime.now(tz=__import__('datetime').timezone.utc).isoformat(),
}
return result
def run_comparison(rows, ml_weights):
"""Compare ML-weighted scoring vs equal-weight scoring across score brackets."""
# Metrics used in scoring (maps to score_* columns)
score_keys = [
"puell_multiple", "mvrv_zscore", "reserve_risk", "rhodl_ratio",
"nupl", "fear_greed", "drawdown", "pct_above_200w_sma", "pct_above_lth_rp",
]
n_metrics = len(score_keys)
equal_weight = 1.0 / n_metrics
brackets = [
(0, 20, "Extreme Caution"),
(21, 40, "Caution"),
(41, 55, "Neutral"),
(56, 70, "Moderate Opportunity"),
(71, 85, "Strong Accumulation"),
(86, 100, "Extreme Accumulation"),
]
# Only use rows with forward returns
scored_rows = [r for r in rows if "fwd_365d" in r]
results = {"equal_weight": [], "ml_weighted": []}
for mode in ["equal_weight", "ml_weighted"]:
for r in scored_rows:
scores = [r[f"score_{k}"] for k in score_keys]
if mode == "equal_weight":
composite = sum(scores) / n_metrics * 10
else:
weighted_sum = sum(r[f"score_{k}"] * ml_weights.get(k, equal_weight) for k in score_keys)
composite = weighted_sum * 10
r[f"composite_{mode}"] = composite
for low, high, label in brackets:
days_in = [r for r in scored_rows if low <= r[f"composite_{mode}"] <= high]
if not days_in:
results[mode].append({
"range": f"{low}-{high}", "label": label,
"days": 0, "avg_365d": None,
})
continue
returns_365 = [r["fwd_365d"] for r in days_in]
win_rate = len([r for r in returns_365 if r > 0]) / len(returns_365) * 100
results[mode].append({
"range": f"{low}-{high}",
"label": label,
"days": len(days_in),
"avg_365d": round(sum(returns_365) / len(returns_365), 2),
"median_365d": round(sorted(returns_365)[len(returns_365) // 2], 2),
"win_rate_365d": round(win_rate, 1),
})
# Print comparison
log.info("\n%-18s | %-8s %-8s %-8s | %-8s %-8s %-8s",
"Bracket", "EQ Avg", "EQ Med", "EQ Win%", "ML Avg", "ML Med", "ML Win%")
log.info("-" * 80)
for eq, ml in zip(results["equal_weight"], results["ml_weighted"]):
eq_avg = f"{eq['avg_365d']:.1f}%" if eq["avg_365d"] is not None else "--"
eq_med = f"{eq['median_365d']:.1f}%" if eq.get("median_365d") is not None else "--"
eq_win = f"{eq['win_rate_365d']:.0f}%" if eq.get("win_rate_365d") is not None else "--"
ml_avg = f"{ml['avg_365d']:.1f}%" if ml["avg_365d"] is not None else "--"
ml_med = f"{ml['median_365d']:.1f}%" if ml.get("median_365d") is not None else "--"
ml_win = f"{ml['win_rate_365d']:.0f}%" if ml.get("win_rate_365d") is not None else "--"
log.info("%-18s | %-8s %-8s %-8s | %-8s %-8s %-8s",
eq["label"], eq_avg, eq_med, eq_win, ml_avg, ml_med, ml_win)
return results
def main():
log.info("=" * 60)
log.info("Bitcoin Accumulation Zone ML Optimizer")
log.info("=" * 60)
if not os.path.exists(HISTORY_PATH):
log.error("No historical data at %s. Run history collector first.", HISTORY_PATH)
sys.exit(1)
# Load data
log.info("Loading historical data...")
index = load_history()
thresholds = load_thresholds()
# Build dataset
log.info("Building training dataset...")
rows = build_dataset(index, thresholds)
# Train model
log.info("Training ML model...")
result = train_model(rows)
if result is None:
log.error("Training failed.")
sys.exit(1)
# Save weights
with open(OUTPUT_PATH, "w") as f:
json.dump(result, f, indent=2)
log.info("\nSaved ML weights to %s", OUTPUT_PATH)
# Print summary
log.info("\n" + "=" * 60)
log.info("SUMMARY")
log.info("=" * 60)
log.info("Model: %s", result["training_info"]["model"])
log.info("Samples: %d (%d positive)", result["training_info"]["n_samples"], result["training_info"]["n_positive"])
log.info("CV AUC: %.3f (+/- %.3f)", result["cv_results"]["mean_auc"], result["cv_results"]["std_auc"])
log.info("CV F1: %.3f", result["cv_results"]["mean_f1"])
log.info("\nTop 5 Feature Importances:")
for name, imp in list(result["feature_importances"].items())[:5]:
log.info(" %-30s %.4f", name, imp)
log.info("\nMetric Weights (ML-Optimized):")
for metric, weight in result["weights"].items():
log.info(" %-25s %.1f%%", metric, weight * 100)
if __name__ == "__main__":
main()