"""Historical backtest engine for Bitcoin Accumulation Zone scoring.""" import json import logging import os import sys from collections import defaultdict from datetime import datetime, timedelta log = logging.getLogger(__name__) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) HISTORY_PATH = os.path.join(BASE_DIR, "data", "history.json") CACHE_PATH = os.path.join(BASE_DIR, "data", "cache.json") # Score brackets matching the dashboard assessment levels BRACKETS = [ (0, 20, "Extreme Caution"), (21, 40, "Caution"), (41, 55, "Neutral"), (56, 70, "Moderate Opportunity"), (71, 85, "Strong Accumulation"), (86, 100, "Extreme Accumulation"), ] # Scoring thresholds — replicated from scoring/engine.py for standalone use METRIC_SCORERS = { "fear_greed": { "ranges": [[None, 10, 10], [10, 25, 7], [25, 45, 4], [45, 55, 2], [55, 75, 1], [75, None, 0]], }, "puell_multiple": { "ranges": [[None, 0.3, 10], [0.3, 0.5, 8], [0.5, 0.8, 5], [0.8, 1.2, 3], [1.2, 2.0, 1], [2.0, None, 0]], }, "mvrv_zscore": { "ranges": [[None, 0, 10], [0, 0.5, 8], [0.5, 1.5, 5], [1.5, 3, 2], [3, 5, 1], [5, None, 0]], }, "reserve_risk": { "ranges": [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]], }, "rhodl_ratio": { "ranges": [[None, 100, 10], [100, 500, 7], [500, 2000, 4], [2000, 10000, 1], [10000, None, 0]], }, "nupl": { "ranges": [[None, 0, 10], [0, 0.25, 7], [0.25, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]], }, } # Ratio-based metrics: score based on price vs reference value RATIO_SCORERS = { "price_vs_200w_sma": { # pct_above ranges "ranges": [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, 100, 1], [100, None, 0]], "price_key": "btc_price", "ref_key": "200w_sma", }, "lth_realized_price": { "ranges": [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, None, 1]], "price_key": "btc_price", "ref_key": "lth_realized_price", }, } # Drawdown scoring DRAWDOWN_RANGES = [[70, None, 10], [50, 70, 8], [30, 50, 6], [20, 30, 4], [10, 20, 2], [None, 10, 0]] def _score_range(value, ranges): """Score a value using range-based thresholds.""" if value is None: return None for low, high, score in ranges: low_ok = low is None or value >= low high_ok = high is None or value < high if low_ok and high_ok: return score return 0 def _build_daily_index(history): """Build a dict mapping metric_key -> {date_str: value} for fast lookup.""" index = {} for key, data in history.items(): if key.startswith("_") or not isinstance(data, dict) or "dates" not in data: continue lookup = {} for d, v in zip(data["dates"], data["values"]): lookup[d] = v index[key] = lookup return index def _get_all_dates(index): """Get sorted union of all dates across all metrics.""" all_dates = set() for lookup in index.values(): all_dates.update(lookup.keys()) return sorted(all_dates) def _last_known_value(lookup, date, max_lookback=30): """Get value for date, or most recent prior value within lookback window.""" if date in lookup: return lookup[date] d = datetime.strptime(date, "%Y-%m-%d") for i in range(1, max_lookback + 1): prev = (d - timedelta(days=i)).strftime("%Y-%m-%d") if prev in lookup: return lookup[prev] return None def _compute_ath_series(price_lookup, dates): """Compute running ATH and drawdown for each date.""" ath = 0 drawdowns = {} for d in dates: p = price_lookup.get(d) if p is None: continue if p > ath: ath = p if ath > 0: drawdowns[d] = ((ath - p) / ath) * 100 return drawdowns def score_day(date, index, drawdowns): """Score a single day using all available metrics. Returns (composite_score, individual_scores, n_metrics).""" scores = [] details = {} # Simple range-based metrics for metric_key, cfg in METRIC_SCORERS.items(): val = _last_known_value(index.get(metric_key, {}), date) if val is not None: s = _score_range(val, cfg["ranges"]) if s is not None: scores.append(s) details[metric_key] = {"value": val, "score": s} # Ratio-based metrics (price vs reference) for metric_key, cfg in RATIO_SCORERS.items(): price_val = _last_known_value(index.get(cfg["price_key"], {}), date) # Try alternate price keys if price_val is None: for pk in ["btc_price_coingecko", "btc_price_sma", "btc_price_lth"]: price_val = _last_known_value(index.get(pk, {}), date) if price_val is not None: break ref_val = _last_known_value(index.get(cfg["ref_key"], {}), date) if price_val is not None and ref_val is not None and ref_val > 0: pct_above = ((price_val - ref_val) / ref_val) * 100 s = _score_range(pct_above, cfg["ranges"]) if s is not None: scores.append(s) details[metric_key] = {"value": pct_above, "score": s} # Drawdown dd = drawdowns.get(date) if dd is not None: s = _score_range(dd, DRAWDOWN_RANGES) if s is not None: scores.append(s) details["drawdown"] = {"value": dd, "score": s} if not scores: return None, details, 0 composite = sum(scores) / len(scores) * 10 return round(composite, 1), details, len(scores) def compute_forward_returns(price_lookup, dates_sorted): """Precompute forward returns for all dates.""" periods = [30, 90, 180, 365] returns = {} for d in dates_sorted: p0 = price_lookup.get(d) if p0 is None or p0 <= 0: continue r = {} dt = datetime.strptime(d, "%Y-%m-%d") for days in periods: future = (dt + timedelta(days=days)).strftime("%Y-%m-%d") pf = price_lookup.get(future) if pf is not None: r[f"{days}d"] = round(((pf - p0) / p0) * 100, 2) if r: returns[d] = r return returns def compute_max_drawdown_forward(price_lookup, date, window=90): """Compute max drawdown within N days after a given date.""" dt = datetime.strptime(date, "%Y-%m-%d") p0 = price_lookup.get(date) if p0 is None or p0 <= 0: return None peak = p0 max_dd = 0 for i in range(1, window + 1): future = (dt + timedelta(days=i)).strftime("%Y-%m-%d") pf = price_lookup.get(future) if pf is None: continue if pf > peak: peak = pf dd = ((peak - pf) / peak) * 100 if dd > max_dd: max_dd = dd return round(max_dd, 2) if max_dd > 0 else 0 def run_backtest(): """Run the full backtest and return comprehensive results.""" log.info("Loading historical data...") if not os.path.exists(HISTORY_PATH): return {"error": "No historical data found. Run history collector first."} with open(HISTORY_PATH) as f: history = json.load(f) index = _build_daily_index(history) # Build price lookup (prefer coingecko for completeness) price_lookup = {} for pk in ["btc_price_coingecko", "btc_price", "btc_price_sma", "btc_price_lth"]: if pk in index: for d, v in index[pk].items(): if d not in price_lookup: price_lookup[d] = v all_dates = _get_all_dates(index) if not all_dates: return {"error": "No date data available."} log.info("Date range: %s to %s (%d days)", all_dates[0], all_dates[-1], len(all_dates)) # Compute drawdowns drawdowns = _compute_ath_series(price_lookup, all_dates) # Precompute forward returns log.info("Computing forward returns...") fwd_returns = compute_forward_returns(price_lookup, all_dates) # Score each day log.info("Scoring %d days...", len(all_dates)) daily_scores = [] for d in all_dates: composite, details, n_metrics = score_day(d, index, drawdowns) if composite is not None and n_metrics >= 3: # Require at least 3 metrics price = price_lookup.get(d) entry = { "date": d, "score": composite, "n_metrics": n_metrics, "price": price, "forward_returns": fwd_returns.get(d, {}), } daily_scores.append(entry) if not daily_scores: return {"error": "No scored days (insufficient metric overlap)."} log.info("Scored %d days with 3+ metrics", len(daily_scores)) # --- Bracket statistics --- bracket_stats = [] for low, high, label in BRACKETS: days_in = [d for d in daily_scores if low <= d["score"] <= high] if not days_in: bracket_stats.append({ "range": f"{low}-{high}", "label": label, "days": 0, }) continue stats = {"range": f"{low}-{high}", "label": label, "days": len(days_in)} for period in ["30d", "90d", "180d", "365d"]: returns = [d["forward_returns"][period] for d in days_in if period in d["forward_returns"]] if returns: returns_sorted = sorted(returns) stats[f"avg_{period}"] = round(sum(returns) / len(returns), 2) stats[f"median_{period}"] = round(returns_sorted[len(returns_sorted) // 2], 2) stats[f"win_rate_{period}"] = round(len([r for r in returns if r > 0]) / len(returns) * 100, 1) stats[f"max_gain_{period}"] = round(max(returns), 2) stats[f"max_loss_{period}"] = round(min(returns), 2) stats[f"n_{period}"] = len(returns) # Average max drawdown within 90 days dd_list = [] for d in days_in: dd = compute_max_drawdown_forward(price_lookup, d["date"], 90) if dd is not None: dd_list.append(dd) if dd_list: stats["avg_max_drawdown_90d"] = round(sum(dd_list) / len(dd_list), 2) bracket_stats.append(stats) # --- Peak signal events --- signal_events = [] thresholds = [90, 80, 70] for thresh in thresholds: prev_score = 0 for d in daily_scores: if d["score"] >= thresh and prev_score < thresh: event = { "date": d["date"], "score": d["score"], "threshold": thresh, "price": d["price"], "forward_returns": d["forward_returns"], } # Add future prices if d["price"]: dt = datetime.strptime(d["date"], "%Y-%m-%d") for days_ahead in [30, 90, 365]: future = (dt + timedelta(days=days_ahead)).strftime("%Y-%m-%d") fp = price_lookup.get(future) if fp: event[f"price_{days_ahead}d"] = round(fp, 2) signal_events.append(event) prev_score = d["score"] signal_events.sort(key=lambda e: e["date"]) # --- Current signal context --- all_scores_list = [d["score"] for d in daily_scores] all_scores_list.sort() # Get current score from cache current_score = None current_price = None if os.path.exists(CACHE_PATH): try: with open(CACHE_PATH) as f: cache = json.load(f) scored = cache.get("_scored", {}) current_score = scored.get("composite_score") current_price = cache.get("price", {}).get("price") except Exception: pass # If no cache, use latest daily score if current_score is None and daily_scores: current_score = daily_scores[-1]["score"] current_price = daily_scores[-1].get("price") current_context = None if current_score is not None: # Percentile below = len([s for s in all_scores_list if s <= current_score]) percentile = round(below / len(all_scores_list) * 100, 1) # Find comparable historical periods comparable = [] margin = 5 for d in daily_scores: if abs(d["score"] - current_score) <= margin and d["forward_returns"]: comparable.append(d) avg_returns = {} if comparable: for period in ["30d", "90d", "180d", "365d"]: vals = [d["forward_returns"][period] for d in comparable if period in d["forward_returns"]] if vals: avg_returns[period] = round(sum(vals) / len(vals), 2) avg_1yr = avg_returns.get("365d") # Best comparable examples (most recent 5) examples = [] for d in comparable[-5:]: examples.append({ "date": d["date"], "score": d["score"], "price": d["price"], "forward_returns": d["forward_returns"], }) current_context = { "current_score": current_score, "current_price": current_price, "percentile": percentile, "comparable_days": len(comparable), "avg_1yr_return": avg_1yr, "avg_30d_return": avg_returns.get("30d"), "avg_90d_return": avg_returns.get("90d"), "avg_180d_return": avg_returns.get("180d"), "examples": examples, } # --- Build time series for charting --- # Downsample to weekly for chart efficiency chart_data = [] for i, d in enumerate(daily_scores): # Include every 7th day + last day if i % 7 == 0 or i == len(daily_scores) - 1: chart_data.append({ "date": d["date"], "score": d["score"], "price": d["price"], }) result = { "date_range": {"start": daily_scores[0]["date"], "end": daily_scores[-1]["date"]}, "total_days_scored": len(daily_scores), "bracket_stats": bracket_stats, "signal_events": signal_events, "current_context": current_context, "chart_data": chart_data, "computed_at": datetime.utcnow().isoformat() + "Z", } log.info("Backtest complete: %d days, %d signal events", len(daily_scores), len(signal_events)) return result