feat: replace ML optimizer with on-chain accumulation zone monitor

Complete rewrite — replaces the ML-based signal optimizer with a transparent
on-chain metric monitoring dashboard. Scrapes 10 metrics from LookIntoBitcoin
(Playwright) and free APIs, scores each 0-10, composite 0-100.

Metrics: Fear & Greed, Puell Multiple, MVRV Z-Score, Drawdown from ATH,
Price vs 200W SMA, Reserve Risk, RHODL Ratio, NUPL, LTH Realized Price,
Hash Ribbons. Auto-refreshes every 15 minutes. Settings page preserved.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
BizzleBot 2026-03-20 22:31:29 +00:00
parent aba30f7718
commit 62e32fc655
14 changed files with 1374 additions and 634 deletions

35
config/thresholds.json Normal file
View File

@ -0,0 +1,35 @@
{
"fear_greed": {
"ranges": [[0, 10, 10], [11, 25, 7], [26, 45, 4], [46, 55, 2], [56, 75, 1], [76, 100, 0]]
},
"puell_multiple": {
"ranges": [[null, 0.3, 10], [0.3, 0.5, 8], [0.5, 0.8, 5], [0.8, 1.2, 3], [1.2, 2.0, 1], [2.0, null, 0]]
},
"mvrv_zscore": {
"ranges": [[null, 0, 10], [0, 0.5, 8], [0.5, 1.5, 5], [1.5, 3, 2], [3, 5, 1], [5, null, 0]]
},
"drawdown": {
"ranges": [[70, null, 10], [50, 70, 8], [30, 50, 6], [20, 30, 4], [10, 20, 2], [null, 10, 0]]
},
"price_vs_200w_sma": {
"ranges": [[null, 0, 10], [0, 20, 6], [20, 50, 3], [50, 100, 1], [100, null, 0]]
},
"reserve_risk": {
"ranges": [[null, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, null, 0]]
},
"rhodl_ratio": {
"ranges": [[null, 100, 10], [100, 500, 7], [500, 2000, 4], [2000, 10000, 1], [10000, null, 0]]
},
"nupl": {
"ranges": [[null, 0, 10], [0, 0.25, 7], [0.25, 0.5, 4], [0.5, 0.75, 1], [0.75, null, 0]]
},
"lth_realized_price": {
"ranges": [[null, 0, 10], [0, 20, 6], [20, 50, 3], [50, null, 1]]
},
"hash_ribbons": {
"buy_signal": 10,
"recent_recovery": 6,
"normal": 3,
"euphoria": 0
}
}

File diff suppressed because it is too large Load Diff

0
scoring/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

418
scoring/engine.py Normal file
View File

@ -0,0 +1,418 @@
"""Scoring engine for Bitcoin accumulation zone metrics."""
import json
import os
import logging
log = logging.getLogger(__name__)
THRESHOLDS_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"config",
"thresholds.json",
)
def load_thresholds():
"""Load scoring thresholds from config."""
try:
with open(THRESHOLDS_PATH) as f:
return json.load(f)
except Exception:
return {}
def _score_range(value, ranges):
"""Score a value using range-based thresholds.
Each range is [low, high, score]. null means unbounded.
"""
if value is None:
return None
for low, high, score in ranges:
low_ok = low is None or value >= low
high_ok = high is None or value < high
if low_ok and high_ok:
return score
return 0
def _score_range_inverted(value, ranges):
"""Score where higher value = lower range index (for drawdown)."""
if value is None:
return None
for low, high, score in ranges:
low_ok = low is None or value >= low
high_ok = high is None or value < high
if low_ok and high_ok:
return score
return 0
def score_fear_greed(value, thresholds=None):
"""Score Fear & Greed index (0-100 input, 0-10 output)."""
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("fear_greed", {})
ranges = t.get("ranges", [[0, 10, 10], [11, 25, 7], [26, 45, 4], [46, 55, 2], [56, 75, 1], [76, 100, 0]])
score = _score_range(value, ranges)
if value <= 10:
desc = "Extreme Fear — historically excellent buying"
elif value <= 25:
desc = "Fear — good accumulation territory"
elif value <= 45:
desc = "Low neutral — moderate opportunity"
elif value <= 55:
desc = "Neutral"
elif value <= 75:
desc = "Greed — caution"
else:
desc = "Extreme Greed — poor time to accumulate"
return score, desc
def score_puell_multiple(value, thresholds=None):
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("puell_multiple", {})
ranges = t.get("ranges", [[None, 0.3, 10], [0.3, 0.5, 8], [0.5, 0.8, 5], [0.8, 1.2, 3], [1.2, 2.0, 1], [2.0, None, 0]])
score = _score_range(value, ranges)
if value < 0.3:
desc = "Deep value — miners under extreme stress"
elif value < 0.5:
desc = "Low — miners selling below average"
elif value < 0.8:
desc = "Below average miner revenue"
elif value < 1.2:
desc = "Average miner revenue"
elif value < 2.0:
desc = "Above average — miners profiting well"
else:
desc = "Elevated — potential top signal"
return score, desc
def score_mvrv_zscore(value, thresholds=None):
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("mvrv_zscore", {})
ranges = t.get("ranges", [[None, 0, 10], [0, 0.5, 8], [0.5, 1.5, 5], [1.5, 3, 2], [3, 5, 1], [5, None, 0]])
score = _score_range(value, ranges)
if value < 0:
desc = "Below realized value — historically perfect buy zone"
elif value < 0.5:
desc = "Near realized value — strong accumulation"
elif value < 1.5:
desc = "Fair value range"
elif value < 3:
desc = "Above fair value"
elif value < 5:
desc = "Overvalued territory"
else:
desc = "Extreme overvaluation — cycle top territory"
return score, desc
def score_drawdown(value, thresholds=None):
"""Score drawdown from ATH (value is % drawdown, e.g. 50 = 50% below ATH)."""
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("drawdown", {})
ranges = t.get("ranges", [[70, None, 10], [50, 70, 8], [30, 50, 6], [20, 30, 4], [10, 20, 2], [None, 10, 0]])
score = _score_range(value, ranges)
if value > 70:
desc = f"{value:.0f}% below ATH — extreme capitulation"
elif value > 50:
desc = f"{value:.0f}% below ATH — deep bear market"
elif value > 30:
desc = f"{value:.0f}% below ATH — significant correction"
elif value > 20:
desc = f"{value:.0f}% below ATH — moderate pullback"
elif value > 10:
desc = f"{value:.0f}% below ATH — minor dip"
else:
desc = f"{value:.0f}% below ATH — near all-time high"
return score, desc
def score_price_vs_200w_sma(price, sma_200w, thresholds=None):
"""Score price relative to 200-week SMA."""
if price is None or sma_200w is None or sma_200w == 0:
return None, "No data"
pct_above = ((price - sma_200w) / sma_200w) * 100
t = (thresholds or load_thresholds()).get("price_vs_200w_sma", {})
ranges = t.get("ranges", [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, 100, 1], [100, None, 0]])
score = _score_range(pct_above, ranges)
if pct_above < 0:
desc = f"Below 200W SMA — historically rare buy zone"
elif pct_above < 20:
desc = f"{pct_above:.0f}% above 200W SMA — good value"
elif pct_above < 50:
desc = f"{pct_above:.0f}% above 200W SMA — moderate"
elif pct_above < 100:
desc = f"{pct_above:.0f}% above 200W SMA — extended"
else:
desc = f"{pct_above:.0f}% above 200W SMA — extremely overheated"
return score, desc
def score_reserve_risk(value, thresholds=None):
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("reserve_risk", {})
ranges = t.get("ranges", [[None, 0.002, 10], [0.002, 0.005, 7], [0.005, 0.01, 4], [0.01, 0.02, 2], [0.02, None, 0]])
score = _score_range(value, ranges)
if value < 0.002:
desc = "Very low risk/reward — strong accumulation"
elif value < 0.005:
desc = "Low risk — good entry"
elif value < 0.01:
desc = "Moderate risk/reward"
elif value < 0.02:
desc = "Elevated risk"
else:
desc = "High risk — cycle top territory"
return score, desc
def score_rhodl_ratio(value, thresholds=None):
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("rhodl_ratio", {})
ranges = t.get("ranges", [[None, 100, 10], [100, 500, 7], [500, 2000, 4], [2000, 10000, 1], [10000, None, 0]])
score = _score_range(value, ranges)
if value < 100:
desc = "Extreme low — long-term holders dominate"
elif value < 500:
desc = "Low — mature holder confidence"
elif value < 2000:
desc = "Moderate rotation"
elif value < 10000:
desc = "Elevated — new money entering"
else:
desc = "Extreme — speculative mania"
return score, desc
def score_nupl(value, thresholds=None):
if value is None:
return None, "No data"
t = (thresholds or load_thresholds()).get("nupl", {})
ranges = t.get("ranges", [[None, 0, 10], [0, 0.25, 7], [0.25, 0.5, 4], [0.5, 0.75, 1], [0.75, None, 0]])
score = _score_range(value, ranges)
if value < 0:
desc = "Capitulation — holders underwater"
elif value < 0.25:
desc = "Hope/Fear — early recovery"
elif value < 0.5:
desc = "Optimism — moderate profit taking"
elif value < 0.75:
desc = "Belief/Greed — significant unrealized gains"
else:
desc = "Euphoria — extreme unrealized profit"
return score, desc
def score_lth_realized_price(price, lth_rp, thresholds=None):
"""Score price relative to Long-Term Holder realized price."""
if price is None or lth_rp is None or lth_rp == 0:
return None, "No data"
pct_above = ((price - lth_rp) / lth_rp) * 100
t = (thresholds or load_thresholds()).get("lth_realized_price", {})
ranges = t.get("ranges", [[None, 0, 10], [0, 20, 6], [20, 50, 3], [50, None, 1]])
score = _score_range(pct_above, ranges)
if pct_above < 0:
desc = f"Below LTH cost basis — LTHs underwater (extreme value)"
elif pct_above < 20:
desc = f"{pct_above:.0f}% above LTH cost basis — good value"
elif pct_above < 50:
desc = f"{pct_above:.0f}% above LTH cost basis — moderate"
else:
desc = f"{pct_above:.0f}% above LTH cost basis — extended"
return score, desc
def score_hash_ribbons(data, thresholds=None):
"""Score hash ribbons based on buy signal detection."""
if not data:
return None, "No data"
if data.get("buy_signal"):
return 10, "Active buy signal — miner capitulation recovery"
return 3, "Normal mining activity"
def score_all(metrics):
"""Score all metrics and return individual + composite scores."""
thresholds = load_thresholds()
results = []
# Fear & Greed
fg = metrics.get("fear_greed", {})
fg_score, fg_desc = score_fear_greed(fg.get("value"), thresholds)
results.append({
"name": "Fear & Greed Index",
"key": "fear_greed",
"value": fg.get("value"),
"display_value": f"{fg.get('value', 'N/A')}{fg.get('classification', '')}",
"score": fg_score,
"description": fg_desc,
"recent": fg.get("recent", []),
})
# Puell Multiple
pm = metrics.get("puell_multiple", {})
pm_score, pm_desc = score_puell_multiple(pm.get("value"), thresholds)
results.append({
"name": "Puell Multiple",
"key": "puell_multiple",
"value": pm.get("value"),
"display_value": f"{pm.get('value', 'N/A'):.4f}" if pm.get("value") is not None else "N/A",
"score": pm_score,
"description": pm_desc,
"recent": pm.get("recent", []),
})
# MVRV Z-Score
mz = metrics.get("mvrv_zscore", {})
mz_score, mz_desc = score_mvrv_zscore(mz.get("value"), thresholds)
results.append({
"name": "MVRV Z-Score",
"key": "mvrv_zscore",
"value": mz.get("value"),
"display_value": f"{mz.get('value', 'N/A'):.2f}" if mz.get("value") is not None else "N/A",
"score": mz_score,
"description": mz_desc,
"recent": mz.get("recent", []),
})
# Drawdown from ATH
dd = metrics.get("drawdown", {})
dd_score, dd_desc = score_drawdown(dd.get("value"), thresholds)
results.append({
"name": "Drawdown from ATH",
"key": "drawdown",
"value": dd.get("value"),
"display_value": f"{dd.get('value', 0):.1f}%" if dd.get("value") is not None else "N/A",
"score": dd_score,
"description": dd_desc,
"recent": [],
})
# Price vs 200W SMA
sma = metrics.get("200w_sma", {})
price_data = metrics.get("price", {})
current_price = price_data.get("price") or sma.get("btc_price")
sma_val = sma.get("value")
sma_score, sma_desc = score_price_vs_200w_sma(current_price, sma_val, thresholds)
results.append({
"name": "Price vs 200W SMA",
"key": "price_vs_200w_sma",
"value": sma_val,
"display_value": f"${sma_val:,.0f}" if sma_val else "N/A",
"score": sma_score,
"description": sma_desc,
"recent": sma.get("recent", []),
})
# Reserve Risk
rr = metrics.get("reserve_risk", {})
rr_score, rr_desc = score_reserve_risk(rr.get("value"), thresholds)
results.append({
"name": "Reserve Risk",
"key": "reserve_risk",
"value": rr.get("value"),
"display_value": f"{rr.get('value', 'N/A'):.6f}" if rr.get("value") is not None else "N/A",
"score": rr_score,
"description": rr_desc,
"recent": rr.get("recent", []),
})
# RHODL Ratio
rh = metrics.get("rhodl_ratio", {})
rh_score, rh_desc = score_rhodl_ratio(rh.get("value"), thresholds)
results.append({
"name": "RHODL Ratio",
"key": "rhodl_ratio",
"value": rh.get("value"),
"display_value": f"{rh.get('value', 'N/A'):.0f}" if rh.get("value") is not None else "N/A",
"score": rh_score,
"description": rh_desc,
"recent": rh.get("recent", []),
})
# NUPL
nu = metrics.get("nupl", {})
nu_score, nu_desc = score_nupl(nu.get("value"), thresholds)
results.append({
"name": "Net Unrealized Profit/Loss",
"key": "nupl",
"value": nu.get("value"),
"display_value": f"{nu.get('value', 'N/A'):.4f}" if nu.get("value") is not None else "N/A",
"score": nu_score,
"description": nu_desc,
"recent": nu.get("recent", []),
})
# LTH Realized Price
lth = metrics.get("lth_realized_price", {})
lth_price = lth.get("btc_price") or current_price
lth_rp = lth.get("value")
lth_score, lth_desc = score_lth_realized_price(lth_price, lth_rp, thresholds)
results.append({
"name": "LTH Realized Price",
"key": "lth_realized_price",
"value": lth_rp,
"display_value": f"${lth_rp:,.0f}" if lth_rp else "N/A",
"score": lth_score,
"description": lth_desc,
"recent": lth.get("recent", []),
})
# Hash Ribbons
hr = metrics.get("hash_ribbons", {})
hr_score, hr_desc = score_hash_ribbons(hr, thresholds)
results.append({
"name": "Hash Ribbons",
"key": "hash_ribbons",
"value": None,
"display_value": "Buy Signal" if hr.get("buy_signal") else "Normal",
"score": hr_score,
"description": hr_desc,
"recent": [],
})
# Compute composite
valid_scores = [r["score"] for r in results if r["score"] is not None]
if valid_scores:
# Scale to 0-100 based on available metrics
composite = sum(valid_scores) / len(valid_scores) * 10
else:
composite = 0
# Assessment text
if composite >= 71:
assessment = "STRONG ACCUMULATION ZONE"
elif composite >= 51:
assessment = "MODERATE OPPORTUNITY"
elif composite >= 31:
assessment = "NEUTRAL"
elif composite >= 15:
assessment = "CAUTION — OVERHEATED"
else:
assessment = "EXTREME CAUTION"
return {
"metrics": results,
"composite_score": round(composite, 1),
"assessment": assessment,
"scored_count": len(valid_scores),
"total_count": len(results),
}

0
scrapers/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

34
scrapers/fear_greed.py Normal file
View File

@ -0,0 +1,34 @@
"""Fear & Greed Index from alternative.me API."""
import logging
import requests
log = logging.getLogger(__name__)
FNG_URL = "https://api.alternative.me/fng/?limit=30"
def fetch():
"""Fetch Fear & Greed data. Returns dict with value, classification, and recent history."""
try:
resp = requests.get(FNG_URL, timeout=15)
resp.raise_for_status()
data = resp.json()
entries = data.get("data", [])
if not entries:
return {"value": None, "error": "No data"}
current = entries[0]
value = int(current["value"])
classification = current.get("value_classification", "")
recent = [int(e["value"]) for e in entries[:30]]
return {
"value": value,
"classification": classification,
"recent": recent,
}
except Exception as e:
log.error("Fear & Greed fetch error: %s", e)
return {"value": None, "error": str(e)}

265
scrapers/lookintobitcoin.py Normal file
View File

@ -0,0 +1,265 @@
"""Playwright scraper for LookIntoBitcoin / BitcoinMagazinePro charts."""
import logging
import traceback
log = logging.getLogger(__name__)
BASE_URL = "https://www.lookintobitcoin.com"
CHARTS = {
"puell_multiple": {
"path": "/charts/puell-multiple/",
"traces": ["Puell Multiple"],
},
"mvrv_zscore": {
"path": "/charts/mvrv-zscore/",
"traces": ["Z-Score"],
},
"reserve_risk": {
"path": "/charts/reserve-risk/",
"traces": ["Reserve Risk"],
},
"rhodl_ratio": {
"path": "/charts/rhodl-ratio/",
"traces": ["RHODL Ratio"],
},
"nupl": {
"path": "/charts/relative-unrealized-profit--loss/",
"traces": ["NUPL"],
},
"200w_sma": {
"path": "/charts/200-week-moving-average-heatmap/",
"traces": ["200 Week Moving Average"],
},
"lth_realized_price": {
"path": "/charts/long-term-holder-realized-price/",
"traces": ["Long-Term Holder Realized Price", "BTC Price"],
},
"hash_ribbons": {
"path": "/charts/hash-ribbons/",
"traces": None,
},
"pi_cycle_bottom": {
"path": "/charts/pi-cycle-top-bottom-indicator/",
"traces": None,
},
"lth_supply": {
"path": "/charts/long-term-holder-supply/",
"traces": None,
},
}
def scrape_chart(chart_path, timeout=25000):
"""Scrape a single chart from LookIntoBitcoin. Returns list of trace dicts or None."""
from playwright.sync_api import sync_playwright
store = {"data": None}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
def handle_response(response):
if "_dash-update-component" in response.url:
try:
store["data"] = response.json()
except Exception:
pass
page.on("response", handle_response)
try:
page.goto(f"{BASE_URL}{chart_path}", timeout=timeout)
page.wait_for_timeout(6000)
except Exception as e:
log.warning("Navigation error for %s: %s", chart_path, e)
finally:
browser.close()
if store["data"]:
try:
return store["data"]["response"]["chart"]["figure"]["data"]
except (KeyError, TypeError):
# Try alternate response structures
try:
resp = store["data"]
if isinstance(resp, dict):
for key in resp:
val = resp[key]
if isinstance(val, dict) and "figure" in val:
return val["figure"]["data"]
if isinstance(val, dict) and "chart" in val:
return val["chart"]["figure"]["data"]
except Exception:
pass
return None
def _find_trace(traces, name):
"""Find a trace by name (case-insensitive partial match)."""
if not traces:
return None
name_lower = name.lower()
# First pass: exact or substring match
for t in traces:
trace_name = t.get("name", "").lower()
if name_lower in trace_name or trace_name in name_lower:
return t
# Second pass: check if all words in name appear in trace name
words = name_lower.split()
for t in traces:
trace_name = t.get("name", "").lower()
if all(w in trace_name for w in words):
return t
return None
def _get_latest_value(trace):
"""Get the most recent non-null y value from a trace."""
if not trace:
return None
y = trace.get("y", [])
for val in reversed(y):
if val is not None:
try:
return float(val)
except (ValueError, TypeError):
continue
return None
def _get_recent_values(trace, n=30):
"""Get the last n non-null values from a trace."""
if not trace:
return []
y = trace.get("y", [])
values = []
for val in reversed(y):
if val is not None:
try:
values.append(float(val))
except (ValueError, TypeError):
continue
if len(values) >= n:
break
values.reverse()
return values
def scrape_all():
"""Scrape all charts and return parsed metric values."""
results = {}
for metric_key, chart_info in CHARTS.items():
log.info("Scraping %s ...", metric_key)
try:
traces = scrape_chart(chart_info["path"])
if not traces:
log.warning("No data for %s", metric_key)
results[metric_key] = {"value": None, "error": "No data returned"}
continue
wanted = chart_info.get("traces")
if metric_key == "puell_multiple":
t = _find_trace(traces, "Puell Multiple")
val = _get_latest_value(t)
results[metric_key] = {
"value": val,
"recent": _get_recent_values(t),
}
elif metric_key == "mvrv_zscore":
t = _find_trace(traces, "Z-Score")
val = _get_latest_value(t)
results[metric_key] = {
"value": val,
"recent": _get_recent_values(t),
}
elif metric_key == "200w_sma":
t = _find_trace(traces, "200 Week Moving Average") or _find_trace(traces, "200 Week MA") or _find_trace(traces, "200W")
val = _get_latest_value(t)
# Also try to find BTC price trace
price_t = _find_trace(traces, "BTC Price") or _find_trace(traces, "Price")
price_val = _get_latest_value(price_t)
results[metric_key] = {
"value": val,
"btc_price": price_val,
"recent": _get_recent_values(t),
}
elif metric_key == "lth_realized_price":
lth_t = _find_trace(traces, "Long-Term Holder Realized Price") or _find_trace(traces, "LTH Realized Price") or _find_trace(traces, "LTH")
price_t = _find_trace(traces, "BTC Price") or _find_trace(traces, "Price")
lth_val = _get_latest_value(lth_t)
price_val = _get_latest_value(price_t)
results[metric_key] = {
"value": lth_val,
"btc_price": price_val,
"recent": _get_recent_values(lth_t),
}
elif metric_key == "hash_ribbons":
# Look for buy/sell signal traces or MA crossover
results[metric_key] = {
"traces": [
{"name": t.get("name", ""), "latest": _get_latest_value(t)}
for t in traces[:6]
],
"value": None,
}
# Try to detect buy signal from trace names/colors
for t in traces:
name = t.get("name", "").lower()
if "buy" in name or "signal" in name:
results[metric_key]["buy_signal"] = True
break
elif metric_key == "lth_supply":
# Get main supply trace
t = traces[0] if traces else None
for candidate in traces:
name = candidate.get("name", "").lower()
if "supply" in name or "lth" in name:
t = candidate
break
recent = _get_recent_values(t, 60)
# Determine trend: compare recent avg to older avg
trend = None
if len(recent) >= 30:
old_avg = sum(recent[:15]) / 15
new_avg = sum(recent[-15:]) / 15
trend = "increasing" if new_avg > old_avg else "decreasing"
results[metric_key] = {
"value": _get_latest_value(t),
"trend": trend,
"recent": _get_recent_values(t),
}
else:
# Generic: grab first non-layout trace with numeric data
t = None
if wanted:
for name in wanted:
t = _find_trace(traces, name)
if t:
break
if not t:
for candidate in traces:
y = candidate.get("y", [])
if y and any(v is not None for v in y[-10:]):
t = candidate
break
val = _get_latest_value(t)
results[metric_key] = {
"value": val,
"recent": _get_recent_values(t),
}
except Exception as e:
log.error("Error scraping %s: %s\n%s", metric_key, e, traceback.format_exc())
results[metric_key] = {"value": None, "error": str(e)}
return results

80
scrapers/price.py Normal file
View File

@ -0,0 +1,80 @@
"""BTC price data from CoinGecko API."""
import logging
import requests
log = logging.getLogger(__name__)
PRICE_URL = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_24hr_change=true"
HISTORY_URL = "https://api.coingecko.com/api/v3/coins/bitcoin/market_chart?vs_currency=usd&days=365"
ATH_URL = "https://api.coingecko.com/api/v3/coins/bitcoin?localization=false&tickers=false&market_data=true&community_data=false&developer_data=false"
def fetch_current():
"""Fetch current BTC price and 24h change."""
try:
resp = requests.get(PRICE_URL, timeout=15)
resp.raise_for_status()
data = resp.json()
btc = data.get("bitcoin", {})
return {
"price": btc.get("usd"),
"change_24h": btc.get("usd_24h_change"),
}
except Exception as e:
log.error("Price fetch error: %s", e)
return {"price": None, "error": str(e)}
def fetch_historical():
"""Fetch 365 days of BTC price history. Returns list of [timestamp, price]."""
try:
resp = requests.get(HISTORY_URL, timeout=30)
resp.raise_for_status()
data = resp.json()
prices = data.get("prices", [])
return prices
except Exception as e:
log.error("Historical price fetch error: %s", e)
return []
def fetch_ath():
"""Fetch BTC all-time high from CoinGecko."""
try:
resp = requests.get(ATH_URL, timeout=15)
resp.raise_for_status()
data = resp.json()
market = data.get("market_data", {})
ath = market.get("ath", {}).get("usd")
ath_change = market.get("ath_change_percentage", {}).get("usd")
return {
"ath": ath,
"ath_change_pct": ath_change,
}
except Exception as e:
log.error("ATH fetch error: %s", e)
return {"ath": None, "error": str(e)}
def calculate_200d_sma(prices):
"""Calculate 200-day SMA from historical price data."""
if not prices or len(prices) < 200:
return None
# prices is [[timestamp, price], ...]
recent_200 = [p[1] for p in prices[-200:]]
return sum(recent_200) / len(recent_200)
def calculate_mayer_multiple(current_price, sma_200d):
"""Mayer Multiple = current price / 200-day SMA."""
if not current_price or not sma_200d or sma_200d == 0:
return None
return current_price / sma_200d
def calculate_drawdown(current_price, ath):
"""Drawdown from ATH as percentage."""
if not current_price or not ath or ath == 0:
return None
return (ath - current_price) / ath * 100