Previously the backtest engine had hardcoded OLD thresholds that diverged from scoring/engine.py + config/thresholds.json. Now loads from thresholds.json directly, ensuring the chart matches the dashboard.
443 lines
16 KiB
Python
443 lines
16 KiB
Python
"""Historical backtest engine for Bitcoin Accumulation Zone scoring."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, BASE_DIR)
|
|
|
|
HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json")
|
|
CACHE_PATH = os.path.join(BASE_DIR, "data", "cache.json")
|
|
|
|
# Score brackets matching the dashboard assessment levels
|
|
BRACKETS = [
|
|
(0, 20, "Extreme Caution"),
|
|
(21, 40, "Caution"),
|
|
(41, 55, "Neutral"),
|
|
(56, 70, "Moderate Opportunity"),
|
|
(71, 85, "Strong Accumulation"),
|
|
(86, 100, "Extreme Accumulation"),
|
|
]
|
|
|
|
# Scoring thresholds — load from config/thresholds.json (single source of truth)
|
|
import os as _os
|
|
import json as _json
|
|
|
|
_THRESH_PATH = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))), "config", "thresholds.json")
|
|
try:
|
|
with open(_THRESH_PATH) as _f:
|
|
_THRESH = _json.load(_f)
|
|
except Exception:
|
|
_THRESH = {}
|
|
|
|
METRIC_SCORERS = {
|
|
"fear_greed": {"ranges": _THRESH.get("fear_greed", {}).get("ranges", [[0, 15, 10], [15, 30, 8], [30, 45, 5], [45, 55, 3], [55, 75, 1], [75, None, 0]])},
|
|
"puell_multiple": {"ranges": _THRESH.get("puell_multiple", {}).get("ranges", [[None, 0.4, 10], [0.4, 0.7, 8], [0.7, 1.0, 5], [1.0, 1.5, 3], [1.5, 2.0, 1], [2.0, None, 0]])},
|
|
"mvrv_zscore": {"ranges": _THRESH.get("mvrv_zscore", {}).get("ranges", [[None, 0, 10], [0, 1.0, 8], [1.0, 2.0, 5], [2.0, 3.0, 3], [3.0, 5.0, 1], [5.0, None, 0]])},
|
|
"reserve_risk": {"ranges": _THRESH.get("reserve_risk", {}).get("ranges", [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]])},
|
|
"rhodl_ratio": {"ranges": _THRESH.get("rhodl_ratio", {}).get("ranges", [[None, 200, 10], [200, 1000, 7], [1000, 5000, 4], [5000, 20000, 1], [20000, None, 0]])},
|
|
"nupl": {"ranges": _THRESH.get("nupl", {}).get("ranges", [[None, 0, 10], [0, 0.3, 8], [0.3, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]])},
|
|
}
|
|
|
|
RATIO_SCORERS = {
|
|
"price_vs_200w_sma": {
|
|
"ranges": _THRESH.get("price_vs_200w_sma", {}).get("ranges", [[None, 0, 10], [0, 30, 7], [30, 60, 5], [60, 100, 2], [100, None, 0]]),
|
|
"price_key": "btc_price",
|
|
"ref_key": "200w_sma",
|
|
},
|
|
"lth_realized_price": {
|
|
"ranges": _THRESH.get("lth_realized_price", {}).get("ranges", [[None, 0, 10], [0, 30, 7], [30, 80, 5], [80, 150, 3], [150, None, 1]]),
|
|
"price_key": "btc_price",
|
|
"ref_key": "lth_realized_price",
|
|
},
|
|
}
|
|
|
|
DRAWDOWN_RANGES = _THRESH.get("drawdown", {}).get("ranges", [[60, None, 10], [40, 60, 8], [25, 40, 6], [15, 25, 4], [5, 15, 2], [None, 5, 0]])
|
|
|
|
|
|
def _score_range(value, ranges):
|
|
"""Score a value using range-based thresholds."""
|
|
if value is None:
|
|
return None
|
|
for low, high, score in ranges:
|
|
low_ok = low is None or value >= low
|
|
high_ok = high is None or value < high
|
|
if low_ok and high_ok:
|
|
return score
|
|
return 0
|
|
|
|
|
|
def _build_daily_index(history):
|
|
"""Build a dict mapping metric_key -> {date_str: value} for fast lookup."""
|
|
index = {}
|
|
for key, data in history.items():
|
|
if key.startswith("_") or not isinstance(data, dict) or "dates" not in data:
|
|
continue
|
|
lookup = {}
|
|
for d, v in zip(data["dates"], data["values"]):
|
|
lookup[d] = v
|
|
index[key] = lookup
|
|
return index
|
|
|
|
|
|
def _get_all_dates(index):
|
|
"""Get sorted union of all dates across all metrics."""
|
|
all_dates = set()
|
|
for lookup in index.values():
|
|
all_dates.update(lookup.keys())
|
|
return sorted(all_dates)
|
|
|
|
|
|
def _last_known_value(lookup, date, max_lookback=30):
|
|
"""Get value for date, or most recent prior value within lookback window."""
|
|
if date in lookup:
|
|
return lookup[date]
|
|
d = datetime.strptime(date, "%Y-%m-%d")
|
|
for i in range(1, max_lookback + 1):
|
|
prev = (d - timedelta(days=i)).strftime("%Y-%m-%d")
|
|
if prev in lookup:
|
|
return lookup[prev]
|
|
return None
|
|
|
|
|
|
def _compute_ath_series(price_lookup, dates):
|
|
"""Compute running ATH and drawdown for each date."""
|
|
ath = 0
|
|
drawdowns = {}
|
|
for d in dates:
|
|
p = price_lookup.get(d)
|
|
if p is None:
|
|
continue
|
|
if p > ath:
|
|
ath = p
|
|
if ath > 0:
|
|
drawdowns[d] = ((ath - p) / ath) * 100
|
|
return drawdowns
|
|
|
|
|
|
def score_day(date, index, drawdowns):
|
|
"""Score a single day using all available metrics. Returns (composite_score, individual_scores, n_metrics)."""
|
|
scores = []
|
|
details = {}
|
|
|
|
# Simple range-based metrics
|
|
for metric_key, cfg in METRIC_SCORERS.items():
|
|
val = _last_known_value(index.get(metric_key, {}), date)
|
|
if val is not None:
|
|
s = _score_range(val, cfg["ranges"])
|
|
if s is not None:
|
|
scores.append(s)
|
|
details[metric_key] = {"value": val, "score": s}
|
|
|
|
# Ratio-based metrics (price vs reference)
|
|
for metric_key, cfg in RATIO_SCORERS.items():
|
|
price_val = _last_known_value(index.get(cfg["price_key"], {}), date)
|
|
# Try alternate price keys
|
|
if price_val is None:
|
|
for pk in ["btc_price_coingecko", "btc_price_sma", "btc_price_lth"]:
|
|
price_val = _last_known_value(index.get(pk, {}), date)
|
|
if price_val is not None:
|
|
break
|
|
ref_val = _last_known_value(index.get(cfg["ref_key"], {}), date)
|
|
if price_val is not None and ref_val is not None and ref_val > 0:
|
|
pct_above = ((price_val - ref_val) / ref_val) * 100
|
|
s = _score_range(pct_above, cfg["ranges"])
|
|
if s is not None:
|
|
scores.append(s)
|
|
details[metric_key] = {"value": pct_above, "score": s}
|
|
|
|
# Drawdown
|
|
dd = drawdowns.get(date)
|
|
if dd is not None:
|
|
s = _score_range(dd, DRAWDOWN_RANGES)
|
|
if s is not None:
|
|
scores.append(s)
|
|
details["drawdown"] = {"value": dd, "score": s}
|
|
|
|
if not scores:
|
|
return None, details, 0
|
|
|
|
composite = sum(scores) / len(scores) * 10
|
|
return round(composite, 1), details, len(scores)
|
|
|
|
|
|
def compute_forward_returns(price_lookup, dates_sorted):
|
|
"""Precompute forward returns for all dates."""
|
|
periods = [30, 90, 180, 365]
|
|
returns = {}
|
|
for d in dates_sorted:
|
|
p0 = price_lookup.get(d)
|
|
if p0 is None or p0 <= 0:
|
|
continue
|
|
r = {}
|
|
dt = datetime.strptime(d, "%Y-%m-%d")
|
|
for days in periods:
|
|
future = (dt + timedelta(days=days)).strftime("%Y-%m-%d")
|
|
pf = price_lookup.get(future)
|
|
if pf is not None:
|
|
r[f"{days}d"] = round(((pf - p0) / p0) * 100, 2)
|
|
if r:
|
|
returns[d] = r
|
|
return returns
|
|
|
|
|
|
def compute_max_drawdown_forward(price_lookup, date, window=90):
|
|
"""Compute max drawdown within N days after a given date."""
|
|
dt = datetime.strptime(date, "%Y-%m-%d")
|
|
p0 = price_lookup.get(date)
|
|
if p0 is None or p0 <= 0:
|
|
return None
|
|
peak = p0
|
|
max_dd = 0
|
|
for i in range(1, window + 1):
|
|
future = (dt + timedelta(days=i)).strftime("%Y-%m-%d")
|
|
pf = price_lookup.get(future)
|
|
if pf is None:
|
|
continue
|
|
if pf > peak:
|
|
peak = pf
|
|
dd = ((peak - pf) / peak) * 100
|
|
if dd > max_dd:
|
|
max_dd = dd
|
|
return round(max_dd, 2) if max_dd > 0 else 0
|
|
|
|
|
|
def run_backtest():
|
|
"""Run the full backtest and return comprehensive results."""
|
|
log.info("Loading historical data...")
|
|
if not os.path.exists(HISTORY_PATH):
|
|
return {"error": "No historical data found. Run history collector first."}
|
|
|
|
with open(HISTORY_PATH) as f:
|
|
history = json.load(f)
|
|
|
|
index = _build_daily_index(history)
|
|
|
|
# Build price lookup (prefer coingecko for completeness)
|
|
price_lookup = {}
|
|
for pk in ["btc_price_coingecko", "btc_price", "btc_price_sma", "btc_price_lth"]:
|
|
if pk in index:
|
|
for d, v in index[pk].items():
|
|
if d not in price_lookup:
|
|
price_lookup[d] = v
|
|
|
|
all_dates = _get_all_dates(index)
|
|
if not all_dates:
|
|
return {"error": "No date data available."}
|
|
|
|
log.info("Date range: %s to %s (%d days)", all_dates[0], all_dates[-1], len(all_dates))
|
|
|
|
# Compute drawdowns
|
|
drawdowns = _compute_ath_series(price_lookup, all_dates)
|
|
|
|
# Precompute forward returns
|
|
log.info("Computing forward returns...")
|
|
fwd_returns = compute_forward_returns(price_lookup, all_dates)
|
|
|
|
# Score each day
|
|
log.info("Scoring %d days...", len(all_dates))
|
|
daily_scores = []
|
|
for d in all_dates:
|
|
composite, details, n_metrics = score_day(d, index, drawdowns)
|
|
if composite is not None and n_metrics >= 3: # Require at least 3 metrics
|
|
price = price_lookup.get(d)
|
|
entry = {
|
|
"date": d,
|
|
"score": composite,
|
|
"n_metrics": n_metrics,
|
|
"price": price,
|
|
"forward_returns": fwd_returns.get(d, {}),
|
|
}
|
|
daily_scores.append(entry)
|
|
|
|
if not daily_scores:
|
|
return {"error": "No scored days (insufficient metric overlap)."}
|
|
|
|
log.info("Scored %d days with 3+ metrics", len(daily_scores))
|
|
|
|
# --- Bracket statistics ---
|
|
bracket_stats = []
|
|
for low, high, label in BRACKETS:
|
|
days_in = [d for d in daily_scores if low <= d["score"] <= high]
|
|
if not days_in:
|
|
bracket_stats.append({
|
|
"range": f"{low}-{high}", "label": label, "days": 0,
|
|
})
|
|
continue
|
|
|
|
stats = {"range": f"{low}-{high}", "label": label, "days": len(days_in)}
|
|
for period in ["30d", "90d", "180d", "365d"]:
|
|
returns = [d["forward_returns"][period] for d in days_in if period in d["forward_returns"]]
|
|
if returns:
|
|
returns_sorted = sorted(returns)
|
|
stats[f"avg_{period}"] = round(sum(returns) / len(returns), 2)
|
|
stats[f"median_{period}"] = round(returns_sorted[len(returns_sorted) // 2], 2)
|
|
stats[f"win_rate_{period}"] = round(len([r for r in returns if r > 0]) / len(returns) * 100, 1)
|
|
stats[f"max_gain_{period}"] = round(max(returns), 2)
|
|
stats[f"max_loss_{period}"] = round(min(returns), 2)
|
|
stats[f"n_{period}"] = len(returns)
|
|
|
|
# Average max drawdown within 90 days
|
|
dd_list = []
|
|
for d in days_in:
|
|
dd = compute_max_drawdown_forward(price_lookup, d["date"], 90)
|
|
if dd is not None:
|
|
dd_list.append(dd)
|
|
if dd_list:
|
|
stats["avg_max_drawdown_90d"] = round(sum(dd_list) / len(dd_list), 2)
|
|
|
|
bracket_stats.append(stats)
|
|
|
|
# --- Peak signal events ---
|
|
signal_events = []
|
|
thresholds = [90, 80, 70]
|
|
for thresh in thresholds:
|
|
prev_score = 0
|
|
for d in daily_scores:
|
|
if d["score"] >= thresh and prev_score < thresh:
|
|
event = {
|
|
"date": d["date"],
|
|
"score": d["score"],
|
|
"threshold": thresh,
|
|
"price": d["price"],
|
|
"forward_returns": d["forward_returns"],
|
|
}
|
|
# Add future prices
|
|
if d["price"]:
|
|
dt = datetime.strptime(d["date"], "%Y-%m-%d")
|
|
for days_ahead in [30, 90, 365]:
|
|
future = (dt + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
|
|
fp = price_lookup.get(future)
|
|
if fp:
|
|
event[f"price_{days_ahead}d"] = round(fp, 2)
|
|
signal_events.append(event)
|
|
prev_score = d["score"]
|
|
|
|
signal_events.sort(key=lambda e: e["date"])
|
|
|
|
# --- Current signal context ---
|
|
all_scores_list = [d["score"] for d in daily_scores]
|
|
all_scores_list.sort()
|
|
|
|
# Get current score from cache
|
|
current_score = None
|
|
current_price = None
|
|
if os.path.exists(CACHE_PATH):
|
|
try:
|
|
with open(CACHE_PATH) as f:
|
|
cache = json.load(f)
|
|
scored = cache.get("_scored", {})
|
|
current_score = scored.get("composite_score")
|
|
current_price = cache.get("price", {}).get("price")
|
|
except Exception:
|
|
pass
|
|
|
|
# If no cache, use latest daily score
|
|
if current_score is None and daily_scores:
|
|
current_score = daily_scores[-1]["score"]
|
|
current_price = daily_scores[-1].get("price")
|
|
|
|
current_context = None
|
|
if current_score is not None:
|
|
# Percentile
|
|
below = len([s for s in all_scores_list if s <= current_score])
|
|
percentile = round(below / len(all_scores_list) * 100, 1)
|
|
|
|
# Find comparable historical periods
|
|
comparable = []
|
|
margin = 5
|
|
for d in daily_scores:
|
|
if abs(d["score"] - current_score) <= margin and d["forward_returns"]:
|
|
comparable.append(d)
|
|
|
|
avg_returns = {}
|
|
if comparable:
|
|
for period in ["30d", "90d", "180d", "365d"]:
|
|
vals = [d["forward_returns"][period] for d in comparable if period in d["forward_returns"]]
|
|
if vals:
|
|
avg_returns[period] = round(sum(vals) / len(vals), 2)
|
|
avg_1yr = avg_returns.get("365d")
|
|
|
|
# Best comparable examples — one per market cycle for diversity
|
|
# Cycles: pre-2016, 2016-2017 bull, 2018-2019 bear, 2020-2021 bull, 2022-2023 bear, 2024+
|
|
cycle_bins = [
|
|
("pre-2016", "2010-01-01", "2015-12-31"),
|
|
("2016-17 Bull", "2016-01-01", "2017-12-31"),
|
|
("2018-19 Bear", "2018-01-01", "2019-12-31"),
|
|
("2020-21 Bull", "2020-01-01", "2021-12-31"),
|
|
("2022-23 Bear", "2022-01-01", "2023-12-31"),
|
|
("2024+", "2024-01-01", "2099-12-31"),
|
|
]
|
|
examples = []
|
|
used_cycles = set()
|
|
# Sort comparable by closest score first, then pick one per cycle
|
|
sorted_comp = sorted(comparable, key=lambda d: abs(d["score"] - current_score))
|
|
for d in sorted_comp:
|
|
cycle_label = None
|
|
for label, start, end in cycle_bins:
|
|
if start <= d["date"] <= end:
|
|
cycle_label = label
|
|
break
|
|
if cycle_label and cycle_label not in used_cycles:
|
|
used_cycles.add(cycle_label)
|
|
examples.append({
|
|
"date": d["date"],
|
|
"score": d["score"],
|
|
"price": d["price"],
|
|
"forward_returns": d["forward_returns"],
|
|
"cycle": cycle_label,
|
|
})
|
|
if len(examples) >= 6:
|
|
break
|
|
# Sort examples chronologically
|
|
examples.sort(key=lambda d: d["date"])
|
|
|
|
current_context = {
|
|
"current_score": current_score,
|
|
"current_price": current_price,
|
|
"percentile": percentile,
|
|
"comparable_days": len(comparable),
|
|
"avg_1yr_return": avg_1yr,
|
|
"avg_30d_return": avg_returns.get("30d"),
|
|
"avg_90d_return": avg_returns.get("90d"),
|
|
"avg_180d_return": avg_returns.get("180d"),
|
|
"examples": examples,
|
|
}
|
|
|
|
# --- Build time series for charting ---
|
|
# Smart downsampling: daily for last 2 years, weekly before that
|
|
chart_data = []
|
|
import datetime as _dt
|
|
try:
|
|
last_date = _dt.datetime.strptime(daily_scores[-1]["date"], "%Y-%m-%d")
|
|
cutoff_date = (last_date - _dt.timedelta(days=730)).strftime("%Y-%m-%d")
|
|
except Exception:
|
|
cutoff_date = "2024-01-01"
|
|
for i, d in enumerate(daily_scores):
|
|
is_recent = d["date"] >= cutoff_date
|
|
if is_recent or i % 7 == 0 or i == len(daily_scores) - 1:
|
|
chart_data.append({
|
|
"date": d["date"],
|
|
"score": d["score"],
|
|
"price": d["price"],
|
|
})
|
|
|
|
result = {
|
|
"date_range": {"start": daily_scores[0]["date"], "end": daily_scores[-1]["date"]},
|
|
"total_days_scored": len(daily_scores),
|
|
"bracket_stats": bracket_stats,
|
|
"signal_events": signal_events,
|
|
"current_context": current_context,
|
|
"chart_data": chart_data,
|
|
"computed_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
|
|
log.info("Backtest complete: %d days, %d signal events", len(daily_scores), len(signal_events))
|
|
return result
|