"""Scoring engine for Bitcoin accumulation zone metrics.""" import json import os import logging log = logging.getLogger(__name__) THRESHOLDS_PATH = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "thresholds.json", ) def load_thresholds(): """Load scoring thresholds from config.""" try: with open(THRESHOLDS_PATH) as f: return json.load(f) except Exception: return {} def _score_range(value, ranges): """Score a value using range-based thresholds. Each range is [low, high, score]. null means unbounded. """ if value is None: return None for low, high, score in ranges: low_ok = low is None or value >= low high_ok = high is None or value < high if low_ok and high_ok: return score return 0 def _score_range_inverted(value, ranges): """Score where higher value = lower range index (for drawdown).""" if value is None: return None for low, high, score in ranges: low_ok = low is None or value >= low high_ok = high is None or value < high if low_ok and high_ok: return score return 0 def score_fear_greed(value, thresholds=None): """Score Fear & Greed index (0-100 input, 0-10 output).""" if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("fear_greed", {}) ranges = t.get("ranges", [[0, 10, 10], [11, 25, 7], [26, 45, 4], [46, 55, 2], [56, 75, 1], [76, 100, 0]]) score = _score_range(value, ranges) if value <= 10: desc = "Extreme Fear — historically excellent buying" elif value <= 25: desc = "Fear — good accumulation territory" elif value <= 45: desc = "Low neutral — moderate opportunity" elif value <= 55: desc = "Neutral" elif value <= 75: desc = "Greed — caution" else: desc = "Extreme Greed — poor time to accumulate" return score, desc def score_puell_multiple(value, thresholds=None): if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("puell_multiple", {}) ranges = t.get("ranges", [[None, 0.3, 10], [0.3, 0.5, 8], [0.5, 0.8, 5], [0.8, 1.2, 3], [1.2, 2.0, 1], [2.0, None, 0]]) score = _score_range(value, ranges) if value < 0.3: desc = "Deep value — miners under extreme stress" elif value < 0.5: desc = "Low — miners selling below average" elif value < 0.8: desc = "Below average miner revenue" elif value < 1.2: desc = "Average miner revenue" elif value < 2.0: desc = "Above average — miners profiting well" else: desc = "Elevated — potential top signal" return score, desc def score_mvrv_zscore(value, thresholds=None): if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("mvrv_zscore", {}) ranges = t.get("ranges", [[None, 0, 10], [0, 0.5, 8], [0.5, 1.5, 5], [1.5, 3, 2], [3, 5, 1], [5, None, 0]]) score = _score_range(value, ranges) if value < 0: desc = "Below realized value — historically perfect buy zone" elif value < 0.5: desc = "Near realized value — strong accumulation" elif value < 1.5: desc = "Fair value range" elif value < 3: desc = "Above fair value" elif value < 5: desc = "Overvalued territory" else: desc = "Extreme overvaluation — cycle top territory" return score, desc def score_drawdown(value, thresholds=None): """Score drawdown from ATH (value is % drawdown, e.g. 50 = 50% below ATH).""" if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("drawdown", {}) ranges = t.get("ranges", [[70, None, 10], [50, 70, 8], [30, 50, 6], [20, 30, 4], [10, 20, 2], [None, 10, 0]]) score = _score_range(value, ranges) if value > 70: desc = f"{value:.0f}% below ATH — extreme capitulation" elif value > 50: desc = f"{value:.0f}% below ATH — deep bear market" elif value > 30: desc = f"{value:.0f}% below ATH — significant correction" elif value > 20: desc = f"{value:.0f}% below ATH — moderate pullback" elif value > 10: desc = f"{value:.0f}% below ATH — minor dip" else: desc = f"{value:.0f}% below ATH — near all-time high" return score, desc def score_price_vs_200w_sma(price, sma_200w, thresholds=None): """Score price relative to 200-week SMA.""" if price is None or sma_200w is None or sma_200w == 0: return None, "No data" pct_above = ((price - sma_200w) / sma_200w) * 100 t = (thresholds or load_thresholds()).get("price_vs_200w_sma", {}) ranges = t.get("ranges", [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, 100, 1], [100, None, 0]]) score = _score_range(pct_above, ranges) if pct_above < 0: desc = f"Below 200W SMA — historically rare buy zone" elif pct_above < 20: desc = f"{pct_above:.0f}% above 200W SMA — good value" elif pct_above < 50: desc = f"{pct_above:.0f}% above 200W SMA — moderate" elif pct_above < 100: desc = f"{pct_above:.0f}% above 200W SMA — extended" else: desc = f"{pct_above:.0f}% above 200W SMA — extremely overheated" return score, desc def score_reserve_risk(value, thresholds=None): if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("reserve_risk", {}) ranges = t.get("ranges", [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]]) score = _score_range(value, ranges) if value < 0.002: desc = "Very low risk/reward — strong accumulation" elif value < 0.005: desc = "Low risk — good entry" elif value < 0.01: desc = "Moderate risk/reward" elif value < 0.02: desc = "Elevated risk" else: desc = "High risk — cycle top territory" return score, desc def score_rhodl_ratio(value, thresholds=None): if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("rhodl_ratio", {}) ranges = t.get("ranges", [[None, 100, 10], [100, 500, 7], [500, 2000, 4], [2000, 10000, 1], [10000, None, 0]]) score = _score_range(value, ranges) if value < 100: desc = "Extreme low — long-term holders dominate" elif value < 500: desc = "Low — mature holder confidence" elif value < 2000: desc = "Moderate rotation" elif value < 10000: desc = "Elevated — new money entering" else: desc = "Extreme — speculative mania" return score, desc def score_nupl(value, thresholds=None): if value is None: return None, "No data" t = (thresholds or load_thresholds()).get("nupl", {}) ranges = t.get("ranges", [[None, 0, 10], [0, 0.25, 7], [0.25, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]]) score = _score_range(value, ranges) if value < 0: desc = "Capitulation — holders underwater" elif value < 0.25: desc = "Hope/Fear — early recovery" elif value < 0.5: desc = "Optimism — moderate profit taking" elif value < 0.75: desc = "Belief/Greed — significant unrealized gains" else: desc = "Euphoria — extreme unrealized profit" return score, desc def score_lth_realized_price(price, lth_rp, thresholds=None): """Score price relative to Long-Term Holder realized price.""" if price is None or lth_rp is None or lth_rp == 0: return None, "No data" pct_above = ((price - lth_rp) / lth_rp) * 100 t = (thresholds or load_thresholds()).get("lth_realized_price", {}) ranges = t.get("ranges", [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, None, 1]]) score = _score_range(pct_above, ranges) if pct_above < 0: desc = f"Below LTH cost basis — LTHs underwater (extreme value)" elif pct_above < 20: desc = f"{pct_above:.0f}% above LTH cost basis — good value" elif pct_above < 50: desc = f"{pct_above:.0f}% above LTH cost basis — moderate" else: desc = f"{pct_above:.0f}% above LTH cost basis — extended" return score, desc def score_hash_ribbons(data, thresholds=None): """Score hash ribbons based on buy signal detection.""" if not data: return None, "No data" if data.get("buy_signal"): return 10, "Active buy signal — miner capitulation recovery" return 3, "Normal mining activity" def score_all(metrics): """Score all metrics and return individual + composite scores.""" thresholds = load_thresholds() results = [] # Fear & Greed fg = metrics.get("fear_greed", {}) fg_score, fg_desc = score_fear_greed(fg.get("value"), thresholds) results.append({ "name": "Fear & Greed Index", "key": "fear_greed", "value": fg.get("value"), "display_value": f"{fg.get('value', 'N/A')} — {fg.get('classification', '')}", "score": fg_score, "description": fg_desc, "recent": fg.get("recent", []), }) # Puell Multiple pm = metrics.get("puell_multiple", {}) pm_score, pm_desc = score_puell_multiple(pm.get("value"), thresholds) results.append({ "name": "Puell Multiple", "key": "puell_multiple", "value": pm.get("value"), "display_value": f"{pm.get('value', 'N/A'):.4f}" if pm.get("value") is not None else "N/A", "score": pm_score, "description": pm_desc, "recent": pm.get("recent", []), }) # MVRV Z-Score mz = metrics.get("mvrv_zscore", {}) mz_score, mz_desc = score_mvrv_zscore(mz.get("value"), thresholds) results.append({ "name": "MVRV Z-Score", "key": "mvrv_zscore", "value": mz.get("value"), "display_value": f"{mz.get('value', 'N/A'):.2f}" if mz.get("value") is not None else "N/A", "score": mz_score, "description": mz_desc, "recent": mz.get("recent", []), }) # Drawdown from ATH dd = metrics.get("drawdown", {}) dd_score, dd_desc = score_drawdown(dd.get("value"), thresholds) results.append({ "name": "Drawdown from ATH", "key": "drawdown", "value": dd.get("value"), "display_value": f"{dd.get('value', 0):.1f}%" if dd.get("value") is not None else "N/A", "score": dd_score, "description": dd_desc, "recent": [], }) # Price vs 200W SMA sma = metrics.get("200w_sma", {}) price_data = metrics.get("price", {}) current_price = price_data.get("price") or sma.get("btc_price") sma_val = sma.get("value") sma_score, sma_desc = score_price_vs_200w_sma(current_price, sma_val, thresholds) results.append({ "name": "Price vs 200W SMA", "key": "price_vs_200w_sma", "value": sma_val, "display_value": f"${sma_val:,.0f}" if sma_val else "N/A", "score": sma_score, "description": sma_desc, "recent": sma.get("recent", []), }) # Reserve Risk rr = metrics.get("reserve_risk", {}) rr_score, rr_desc = score_reserve_risk(rr.get("value"), thresholds) results.append({ "name": "Reserve Risk", "key": "reserve_risk", "value": rr.get("value"), "display_value": f"{rr.get('value', 'N/A'):.6f}" if rr.get("value") is not None else "N/A", "score": rr_score, "description": rr_desc, "recent": rr.get("recent", []), }) # RHODL Ratio rh = metrics.get("rhodl_ratio", {}) rh_score, rh_desc = score_rhodl_ratio(rh.get("value"), thresholds) results.append({ "name": "RHODL Ratio", "key": "rhodl_ratio", "value": rh.get("value"), "display_value": f"{rh.get('value', 'N/A'):.0f}" if rh.get("value") is not None else "N/A", "score": rh_score, "description": rh_desc, "recent": rh.get("recent", []), }) # NUPL nu = metrics.get("nupl", {}) nu_score, nu_desc = score_nupl(nu.get("value"), thresholds) results.append({ "name": "Net Unrealized Profit/Loss", "key": "nupl", "value": nu.get("value"), "display_value": f"{nu.get('value', 'N/A'):.4f}" if nu.get("value") is not None else "N/A", "score": nu_score, "description": nu_desc, "recent": nu.get("recent", []), }) # LTH Realized Price lth = metrics.get("lth_realized_price", {}) lth_price = lth.get("btc_price") or current_price lth_rp = lth.get("value") lth_score, lth_desc = score_lth_realized_price(lth_price, lth_rp, thresholds) results.append({ "name": "LTH Realized Price", "key": "lth_realized_price", "value": lth_rp, "display_value": f"${lth_rp:,.0f}" if lth_rp else "N/A", "score": lth_score, "description": lth_desc, "recent": lth.get("recent", []), }) # Hash Ribbons hr = metrics.get("hash_ribbons", {}) hr_score, hr_desc = score_hash_ribbons(hr, thresholds) results.append({ "name": "Hash Ribbons", "key": "hash_ribbons", "value": None, "display_value": "Buy Signal" if hr.get("buy_signal") else "Normal", "score": hr_score, "description": hr_desc, "recent": [], }) # Compute composite valid_scores = [r["score"] for r in results if r["score"] is not None] if valid_scores: # Scale to 0-100 based on available metrics composite = sum(valid_scores) / len(valid_scores) * 10 else: composite = 0 # Assessment text if composite >= 71: assessment = "STRONG ACCUMULATION ZONE" elif composite >= 51: assessment = "MODERATE OPPORTUNITY" elif composite >= 31: assessment = "NEUTRAL" elif composite >= 15: assessment = "CAUTION — OVERHEATED" else: assessment = "EXTREME CAUTION" return { "metrics": results, "composite_score": round(composite, 1), "assessment": assessment, "scored_count": len(valid_scores), "total_count": len(results), }