btc-accumulation-monitor/ml_engine/train_and_backtest.py
BizzleBot 560863fa0d pivot: rewrite as BTC accumulation signal optimizer
Replace day-trading bot with long-term accumulation signal model.
Predicts optimal BUY times using forward return analysis at 7d/30d/90d
horizons, scoring each candle 0-100. Primary metric is now
cost_basis_improvement_pct (model buy price vs DCA).

- train_and_backtest.py: regression models (XGBoost/LSTM hybrid),
  accumulation-focused features (price position, momentum, volatility,
  volume, cycle), forward return targets, signal quality backtesting
- orchestrator.py: cost improvement scoring, signal count validation
- analyzer.py: accumulation-focused LLM system prompt
- dashboard: cost improvement display, signal metrics table
- config: new accumulation-focused parameters

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 23:51:43 +00:00

1043 lines
39 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BTC Accumulation Signal Optimizer -- Train & Backtest Engine
Self-contained script that runs on the Windows PC with GPU.
Predicts the best times to BUY BTC for long-term holding by scoring
each candle with an Accumulation Score (0-100).
Usage:
python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json
"""
import argparse
import json
import sys
import warnings
import numpy as np
import pandas as pd
from datetime import datetime
import ta
from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator
from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator
from ta.volatility import BollingerBands, AverageTrueRange
from ta.volume import OnBalanceVolumeIndicator
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
warnings.filterwarnings("ignore")
# ---------------------------------------------------------------------------
# Feature Engineering
# ---------------------------------------------------------------------------
def compute_features(df, config):
"""Compute accumulation-focused features from OHLCV data."""
feat = config.get("features", {})
c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"]
# --- Price Position ---
if feat.get("use_price_position", True):
# Distance from ATH (all-time high in dataset) as %
rolling_ath = c.expanding().max()
df["dist_from_ath_pct"] = (c - rolling_ath) / rolling_ath * 100
# Distance from 52-week high/low (using ~2190 4h candles = 365 days)
period_52w = min(2190, len(df) - 1)
if period_52w > 50:
rolling_52w_high = h.rolling(period_52w, min_periods=50).max()
rolling_52w_low = l.rolling(period_52w, min_periods=50).min()
df["dist_from_52w_high_pct"] = (c - rolling_52w_high) / rolling_52w_high * 100
df["dist_from_52w_low_pct"] = (c - rolling_52w_low) / rolling_52w_low * 100
# Price vs SMA(50) and SMA(200)
sma50 = SMAIndicator(c, window=50).sma_indicator()
sma200 = SMAIndicator(c, window=200).sma_indicator()
df["SMA_50"] = sma50
df["SMA_200"] = sma200
df["price_vs_sma50_pct"] = (c - sma50) / sma50 * 100
df["price_vs_sma200_pct"] = (c - sma200) / sma200 * 100
df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100 # golden/death cross
# Price percentile over last 365 candles (~2190 for 4h, ~8760 for 1h)
period_365 = min(2190, len(df) - 1)
if period_365 > 50:
df["price_percentile_365"] = c.rolling(period_365, min_periods=50).apply(
lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False
)
# Additional SMAs
for p in [10, 20]:
sma = SMAIndicator(c, window=p).sma_indicator()
df[f"price_vs_sma{p}_pct"] = (c - sma) / sma * 100
# --- Momentum / Oversold ---
if feat.get("use_momentum", True):
df["RSI_14"] = RSIIndicator(c, window=14).rsi()
df["RSI_7"] = RSIIndicator(c, window=7).rsi()
macd = MACD(c, window_slow=26, window_fast=12, window_sign=9)
df["MACD_line"] = macd.macd()
df["MACD_signal"] = macd.macd_signal()
df["MACD_hist"] = macd.macd_diff()
stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3)
df["stoch_k"] = stoch.stoch()
df["stoch_d"] = stoch.stoch_signal()
df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r()
df["ROC_30"] = ROCIndicator(c, window=30).roc()
df["ROC_90"] = ROCIndicator(c, window=90).roc()
# --- Volatility / Fear ---
if feat.get("use_volatility", True):
bb = BollingerBands(c, window=20, window_dev=2)
df["BB_width"] = (bb.bollinger_hband() - bb.bollinger_lband()) / c
df["BB_pctb"] = bb.bollinger_pband()
df["price_vs_lower_bb"] = (c - bb.bollinger_lband()) / c
atr = AverageTrueRange(h, l, c, window=14)
df["ATR_14"] = atr.average_true_range()
df["ATR_pct"] = df["ATR_14"] / c * 100
# Consecutive red candles
is_red = (c < o).astype(int)
df["consecutive_red"] = is_red.groupby(
(is_red != is_red.shift()).cumsum()
).cumsum() * is_red
# Max drawdown over last 30 candles
if len(df) > 30:
rolling_max_30 = c.rolling(30, min_periods=1).max()
df["drawdown_30"] = (c - rolling_max_30) / rolling_max_30 * 100
# Historical volatility
df["hist_volatility_20"] = c.pct_change().rolling(20).std() * np.sqrt(252)
# --- Volume ---
if feat.get("use_volume", True):
df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume()
vol_sma20 = v.rolling(20).mean()
df["volume_ratio"] = v / vol_sma20
# Volume on red vs green candles ratio (rolling 20)
is_green = (c >= o).astype(float)
is_red_f = (c < o).astype(float)
green_vol = (v * is_green).rolling(20).sum()
red_vol = (v * is_red_f).rolling(20).sum()
df["red_green_vol_ratio"] = red_vol / (green_vol + 1e-10)
# OBV trend (slope over 20 candles)
df["OBV_slope"] = df["OBV"].diff(20) / (df["OBV"].rolling(20).mean().abs() + 1e-10)
# --- Cycle ---
if feat.get("use_cycle", True):
# MA(50) vs MA(200) position (bull/bear regime) -- already computed if price_position
if "sma50_vs_sma200" not in df.columns:
sma50 = SMAIndicator(c, window=50).sma_indicator()
sma200 = SMAIndicator(c, window=200).sma_indicator()
df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100
# Days since last major drawdown (>20% from peak)
rolling_peak = c.expanding().max()
drawdown_from_peak = (c - rolling_peak) / rolling_peak
major_dd = (drawdown_from_peak < -0.20).astype(int)
# Count candles since last major drawdown
dd_groups = major_dd.groupby((major_dd != major_dd.shift()).cumsum())
df["candles_since_major_dd"] = dd_groups.cumcount()
# Reset to 0 at drawdown points, count up otherwise
df.loc[major_dd == 1, "candles_since_major_dd"] = 0
return df
# ---------------------------------------------------------------------------
# Target: Accumulation Score
# ---------------------------------------------------------------------------
def create_accumulation_target(df, config):
"""Create accumulation score target based on forward returns.
For each candle, compute actual forward returns at multiple horizons,
rank them, and create a weighted accumulation score (0-100).
Times when buying led to the best long-term returns get highest scores.
"""
tgt = config.get("target", {})
timeframe = config.get("timeframe", "4h")
if timeframe == "1h":
forward_periods = tgt.get("forward_periods_1h", [168, 720, 2160])
else:
forward_periods = tgt.get("forward_periods_4h", [42, 180, 540])
weights = tgt.get("weights", [0.2, 0.3, 0.5])
# Ensure weights sum to 1
w_sum = sum(weights)
weights = [w / w_sum for w in weights]
close = df["close"].values
n = len(close)
# Compute forward returns for each horizon
forward_returns = []
for period in forward_periods:
fwd = np.full(n, np.nan)
for i in range(n - period):
fwd[i] = (close[i + period] - close[i]) / close[i] * 100
forward_returns.append(fwd)
# Rank each forward return (percentile rank, 0-1)
# Higher rank = better buy point (higher future return)
ranked = []
for fwd in forward_returns:
valid_mask = ~np.isnan(fwd)
ranks = np.full(n, np.nan)
valid_vals = fwd[valid_mask]
if len(valid_vals) > 0:
from scipy.stats import rankdata
r = rankdata(valid_vals, method="average") / len(valid_vals)
ranks[valid_mask] = r
ranked.append(ranks)
# Weighted combination of ranks -> accumulation score (0-100)
score = np.zeros(n)
valid = np.ones(n, dtype=bool)
for r, w in zip(ranked, weights):
nan_mask = np.isnan(r)
valid &= ~nan_mask
r_filled = np.where(nan_mask, 0, r)
score += w * r_filled
# Scale to 0-100
score = score * 100
score[~valid] = np.nan
return pd.Series(score, index=df.index, name="target")
# ---------------------------------------------------------------------------
# LSTM Regressor (PyTorch)
# ---------------------------------------------------------------------------
def get_device():
"""Detect best available device for PyTorch."""
import torch
if torch.cuda.is_available():
return torch.device("cuda")
return torch.device("cpu")
class LSTMRegressor:
"""PyTorch LSTM for regression of accumulation scores."""
def __init__(self, input_size, hp):
import torch
import torch.nn as nn
self.hp = hp
self.device = get_device()
self.sequence_length = hp.get("lstm_sequence_length", 30)
hidden_size = hp.get("lstm_hidden_size", 128)
num_layers = hp.get("lstm_num_layers", 2)
dropout = hp.get("lstm_dropout", 0.3)
class _LSTMNet(nn.Module):
def __init__(self_net):
super().__init__()
self_net.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0,
)
self_net.dropout = nn.Dropout(dropout)
self_net.fc = nn.Linear(hidden_size, 1)
def forward(self_net, x):
lstm_out, _ = self_net.lstm(x)
last_hidden = lstm_out[:, -1, :]
out = self_net.dropout(last_hidden)
out = self_net.fc(out)
return out.squeeze(-1)
self.model = _LSTMNet().to(self.device)
self.feature_importances_ = None
def _make_sequences(self, X, y=None):
"""Convert flat feature arrays into overlapping sequences."""
import torch
seq_len = self.sequence_length
sequences = []
targets = []
for i in range(seq_len, len(X)):
sequences.append(X[i - seq_len:i])
if y is not None:
targets.append(y[i])
X_seq = torch.FloatTensor(np.array(sequences)).to(self.device)
if y is not None:
y_seq = torch.FloatTensor(np.array(targets)).to(self.device)
return X_seq, y_seq
return X_seq
def fit(self, X_train, y_train, X_val=None, y_val=None):
import torch
import torch.nn as nn
lr = self.hp.get("learning_rate", 0.001)
epochs = self.hp.get("lstm_epochs", 100)
batch_size = self.hp.get("lstm_batch_size", 64)
patience = self.hp.get("lstm_patience", 10)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
criterion = nn.MSELoss()
X_seq, y_seq = self._make_sequences(X_train, y_train)
if X_val is not None and y_val is not None:
X_val_seq, y_val_seq = self._make_sequences(X_val, y_val)
has_val = len(X_val_seq) > 0
else:
has_val = False
best_val_loss = float("inf")
patience_counter = 0
best_state = None
self.model.train()
n_samples = len(X_seq)
for epoch in range(epochs):
perm = torch.randperm(n_samples)
X_seq_shuffled = X_seq[perm]
y_seq_shuffled = y_seq[perm]
epoch_loss = 0.0
n_batches = 0
for start in range(0, n_samples, batch_size):
end = min(start + batch_size, n_samples)
X_batch = X_seq_shuffled[start:end]
y_batch = y_seq_shuffled[start:end]
optimizer.zero_grad()
preds = self.model(X_batch)
loss = criterion(preds, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
avg_loss = epoch_loss / max(n_batches, 1)
if has_val:
self.model.eval()
with torch.no_grad():
val_preds = self.model(X_val_seq)
val_loss = criterion(val_preds, y_val_seq).item()
self.model.train()
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
best_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= patience:
print(f" LSTM early stop at epoch {epoch+1}, val_loss={val_loss:.4f}")
break
if (epoch + 1) % 20 == 0:
print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}, val_loss={val_loss:.4f}")
else:
if (epoch + 1) % 20 == 0:
print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}")
if best_state is not None:
self.model.load_state_dict(best_state)
def predict(self, X):
import torch
self.model.eval()
X_seq = self._make_sequences(X)
with torch.no_grad():
preds = self.model(X_seq).cpu().numpy()
# Clamp to 0-100
return np.clip(preds, 0, 100)
# ---------------------------------------------------------------------------
# Model Building
# ---------------------------------------------------------------------------
def build_model(config, input_size=0):
"""Build regression model based on config."""
model_type = config.get("model_type", "xgboost")
hp = config.get("hyperparameters", {})
if model_type == "lstm":
return LSTMRegressor(input_size, hp)
if model_type == "xgboost":
import xgboost as xgb
try:
import torch
gpu_available = torch.cuda.is_available()
except ImportError:
gpu_available = False
params = {
"learning_rate": hp.get("learning_rate", 0.01),
"max_depth": hp.get("max_depth", 5),
"n_estimators": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"colsample_bytree": hp.get("colsample_bytree", 0.8),
"min_child_weight": hp.get("min_child_weight", 10),
"gamma": hp.get("gamma", 0.3),
"reg_alpha": hp.get("reg_alpha", 0.5),
"reg_lambda": hp.get("reg_lambda", 3.0),
"objective": "reg:squarederror",
"eval_metric": "rmse",
"random_state": 42,
"device": "cuda" if gpu_available else "cpu",
"verbosity": 0,
}
return xgb.XGBRegressor(**params)
elif model_type == "lightgbm":
import lightgbm as lgb
params = {
"learning_rate": hp.get("learning_rate", 0.01),
"max_depth": hp.get("max_depth", 5),
"n_estimators": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"colsample_bytree": hp.get("colsample_bytree", 0.8),
"min_child_samples": hp.get("min_child_weight", 10),
"reg_alpha": hp.get("reg_alpha", 0.5),
"reg_lambda": hp.get("reg_lambda", 3.0),
"objective": "regression",
"metric": "rmse",
"random_state": 42,
"verbose": -1,
}
try:
params["device"] = "gpu"
return lgb.LGBMRegressor(**params)
except Exception:
params["device"] = "cpu"
return lgb.LGBMRegressor(**params)
elif model_type == "catboost":
from catboost import CatBoostRegressor
try:
import torch
gpu_available = torch.cuda.is_available()
except ImportError:
gpu_available = False
params = {
"learning_rate": hp.get("learning_rate", 0.01),
"depth": hp.get("max_depth", 5),
"iterations": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"l2_leaf_reg": hp.get("reg_lambda", 3.0),
"loss_function": "RMSE",
"random_seed": 42,
"verbose": 0,
"task_type": "GPU" if gpu_available else "CPU",
}
return CatBoostRegressor(**params)
elif model_type == "ensemble":
raise ValueError("Use 'hybrid' instead of 'ensemble' for accumulation mode")
else:
raise ValueError(f"Unknown model_type: {model_type}")
# ---------------------------------------------------------------------------
# Scaling & PCA
# ---------------------------------------------------------------------------
def apply_scaling_pca(X_train, X_val, X_test, config):
"""Apply StandardScaler and optional PCA."""
feat_cfg = config.get("features", {})
scaler = None
pca = None
if feat_cfg.get("use_scaler", True):
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
if X_val is not None:
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)
if feat_cfg.get("use_pca", False):
variance = feat_cfg.get("pca_variance", 0.95)
pca = PCA(n_components=variance, svd_solver="full")
X_train = pca.fit_transform(X_train)
if X_val is not None:
X_val = pca.transform(X_val)
X_test = pca.transform(X_test)
print(f" PCA: {pca.n_components_} components (retaining {variance*100:.0f}% variance)")
return X_train, X_val, X_test, scaler, pca
# ---------------------------------------------------------------------------
# Rolling Window Validation
# ---------------------------------------------------------------------------
def rolling_window_train_test(df, feature_cols, config):
"""Rolling window train/test for accumulation score prediction."""
training_cfg = config.get("training", {})
train_size = training_cfg.get("rolling_train_size", 2500)
test_size = training_cfg.get("rolling_test_size", 300)
val_pct = training_cfg.get("validation_pct", 0.15)
model_type = config.get("model_type", "xgboost")
n = len(df)
all_predictions = [] # list of (predicted_score, actual_score, close_price)
per_window_results = []
feature_importances_sum = None
fi_count = 0
effective_n_features = len(feature_cols)
window_count = 0
last_pca = None
start = 0
while start + train_size + test_size <= n:
train_end = start + train_size
test_end = min(train_end + test_size, n)
train_full = df.iloc[start:train_end]
test_df = df.iloc[train_end:test_end]
if len(test_df) < 10:
start += test_size
continue
# Check target has variance
train_target = train_full["target"].dropna()
if len(train_target) < 100 or train_target.std() < 1.0:
start += test_size
continue
# Split train into train/val
val_split = int(len(train_full) * (1.0 - val_pct))
train_df = train_full.iloc[:val_split]
val_df = train_full.iloc[val_split:]
X_train = train_df[feature_cols].values
y_train = train_df["target"].values
X_val = val_df[feature_cols].values
y_val = val_df["target"].values
X_test = test_df[feature_cols].values
y_test = test_df["target"].values
# Scale and PCA
X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config)
last_pca = pca
current_n_features = X_train.shape[1]
if feature_importances_sum is None:
effective_n_features = current_n_features
feature_importances_sum = np.zeros(effective_n_features)
window_count += 1
total_possible = (n - train_size) // test_size
# Train and predict
preds, fi = _train_and_predict_window(
X_train, y_train, X_val, y_val, X_test,
config, current_n_features, window_count - 1, total_possible
)
if fi is not None and len(fi) == effective_n_features:
feature_importances_sum += fi
fi_count += 1
# Align predictions with test data
if model_type in ("lstm", "hybrid"):
seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30)
offset = seq_len if model_type == "lstm" else seq_len
if len(preds) < len(test_df):
test_aligned = test_df.iloc[offset:offset + len(preds)]
else:
test_aligned = test_df.iloc[:len(preds)]
else:
test_aligned = test_df.iloc[:len(preds)]
min_len = min(len(preds), len(test_aligned))
preds = preds[:min_len]
test_aligned = test_aligned.iloc[:min_len]
for pred_score, actual_score, close_price in zip(
preds, test_aligned["target"].values, test_aligned["close"].values
):
if not np.isnan(actual_score):
all_predictions.append({
"predicted": float(pred_score),
"actual": float(actual_score),
"close": float(close_price),
})
# Per-window cost improvement
window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values)
if not np.isnan(p[1])]
if window_preds:
wp_pred = [p[0] for p in window_preds]
wp_close = [p[2] for p in window_preds]
dca_avg = np.mean(wp_close)
# Model buys: only when predicted score > strong_buy_threshold
threshold = config.get("strategy", {}).get("strong_buy_threshold", 80)
buy_prices = [close for pred, _, close in window_preds if pred >= threshold]
if len(buy_prices) >= 3:
model_avg = np.mean(buy_prices)
improvement = (dca_avg - model_avg) / dca_avg * 100
else:
improvement = 0.0
per_window_results.append(round(improvement, 1))
print(f" Window {window_count}: {len(buy_prices)} signals, cost improvement={improvement:.1f}%")
else:
per_window_results.append(0.0)
print(f" Window {window_count}: no valid predictions")
start += test_size
# Build feature names
if last_pca is not None and config.get("features", {}).get("use_pca", False):
effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)]
else:
effective_feature_names = feature_cols
if feature_importances_sum is None:
feature_importances_sum = np.zeros(len(effective_feature_names))
return compile_results(all_predictions, per_window_results,
feature_importances_sum, fi_count,
effective_feature_names, config)
def walk_forward_train_test(df, feature_cols, config):
"""Walk-forward or rolling window validation."""
training_cfg = config.get("training", {})
if training_cfg.get("rolling_window", True):
return rolling_window_train_test(df, feature_cols, config)
# Static walk-forward: split into N windows
n_windows = training_cfg.get("walk_forward_windows", 5)
train_pct = training_cfg.get("train_pct", 0.7)
val_pct = training_cfg.get("validation_pct", 0.15)
n = len(df)
window_size = n // n_windows
all_predictions = []
per_window_results = []
feature_importances_sum = None
fi_count = 0
effective_n_features = len(feature_cols)
last_pca = None
for w in range(n_windows):
w_start = w * window_size
w_end = min((w + 1) * window_size + int(window_size * 0.3), n)
if w_end > n:
w_end = n
window_data = df.iloc[w_start:w_end].copy()
wn = len(window_data)
train_end = int(wn * train_pct)
val_end = int(wn * (train_pct + val_pct))
train_df = window_data.iloc[:train_end]
val_df = window_data.iloc[train_end:val_end]
test_df = window_data.iloc[val_end:]
if len(test_df) < 10:
continue
train_target = train_df["target"].dropna()
if len(train_target) < 100 or train_target.std() < 1.0:
continue
X_train = train_df[feature_cols].values
y_train = train_df["target"].values
X_val = val_df[feature_cols].values
y_val = val_df["target"].values
X_test = test_df[feature_cols].values
X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config)
last_pca = pca
current_n_features = X_train.shape[1]
if feature_importances_sum is None:
effective_n_features = current_n_features
feature_importances_sum = np.zeros(effective_n_features)
preds, fi = _train_and_predict_window(
X_train, y_train, X_val, y_val, X_test,
config, current_n_features, w, n_windows
)
if fi is not None and len(fi) == effective_n_features:
feature_importances_sum += fi
fi_count += 1
model_type = config.get("model_type", "xgboost")
if model_type in ("lstm", "hybrid"):
seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30)
test_aligned = test_df.iloc[seq_len:seq_len + len(preds)]
else:
test_aligned = test_df.iloc[:len(preds)]
min_len = min(len(preds), len(test_aligned))
preds = preds[:min_len]
test_aligned = test_aligned.iloc[:min_len]
for pred_score, actual_score, close_price in zip(
preds, test_aligned["target"].values, test_aligned["close"].values
):
if not np.isnan(actual_score):
all_predictions.append({
"predicted": float(pred_score),
"actual": float(actual_score),
"close": float(close_price),
})
# Per-window metrics
window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values)
if not np.isnan(p[1])]
if window_preds:
threshold = config.get("strategy", {}).get("strong_buy_threshold", 80)
wp_close = [p[2] for p in window_preds]
buy_prices = [close for pred, _, close in window_preds if pred >= threshold]
dca_avg = np.mean(wp_close)
if len(buy_prices) >= 3:
improvement = (dca_avg - np.mean(buy_prices)) / dca_avg * 100
else:
improvement = 0.0
per_window_results.append(round(improvement, 1))
print(f" Window {w+1}/{n_windows}: {len(buy_prices)} signals, improvement={improvement:.1f}%")
else:
per_window_results.append(0.0)
if last_pca is not None and config.get("features", {}).get("use_pca", False):
effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)]
else:
effective_feature_names = feature_cols
if feature_importances_sum is None:
feature_importances_sum = np.zeros(len(effective_feature_names))
return compile_results(all_predictions, per_window_results,
feature_importances_sum, fi_count,
effective_feature_names, config)
# ---------------------------------------------------------------------------
# Train & Predict Helpers
# ---------------------------------------------------------------------------
def _train_and_predict_window(X_train, y_train, X_val, y_val, X_test,
config, n_features, w_idx, n_windows):
"""Train model and return predictions + feature importances."""
model_type = config.get("model_type", "xgboost")
if model_type == "hybrid":
return _hybrid_train_and_predict(
X_train, y_train, X_val, y_val, X_test,
config, n_features
)
if model_type == "lstm":
model = build_model(config, input_size=n_features)
else:
model = build_model(config)
try:
if model_type == "lstm":
model.fit(X_train, y_train, X_val, y_val)
else:
model.fit(X_train, y_train)
except Exception as e:
print(f" Window {w_idx+1}: training failed -- {e}", file=sys.stderr)
return np.array([]), None
try:
if model_type == "lstm":
preds = model.predict(X_test)
else:
preds = model.predict(X_test)
preds = np.clip(preds, 0, 100)
except Exception as e:
print(f" Window {w_idx+1}: prediction failed -- {e}", file=sys.stderr)
return np.array([]), None
fi = _extract_feature_importances(model, n_features)
return preds, fi
def _hybrid_train_and_predict(X_train, y_train, X_val, y_val, X_test,
config, n_features):
"""Hybrid: average of LSTM + XGBoost regression predictions."""
hp = config.get("hyperparameters", {})
seq_len = hp.get("lstm_sequence_length", 30)
# Train LSTM
lstm_config = {**config, "model_type": "lstm"}
lstm_model = build_model(lstm_config, input_size=n_features)
try:
lstm_model.fit(X_train, y_train, X_val, y_val)
lstm_preds = lstm_model.predict(X_test)
except Exception as e:
print(f" Hybrid LSTM failed: {e}", file=sys.stderr)
lstm_preds = np.full(max(0, len(X_test) - seq_len), 50.0)
# Train XGBoost
xgb_config = {**config, "model_type": "xgboost"}
xgb_model = build_model(xgb_config)
try:
xgb_model.fit(X_train, y_train)
xgb_preds_full = xgb_model.predict(X_test)
xgb_preds_full = np.clip(xgb_preds_full, 0, 100)
except Exception as e:
print(f" Hybrid XGBoost failed: {e}", file=sys.stderr)
xgb_preds_full = np.full(len(X_test), 50.0)
# Align: LSTM output is shorter by seq_len
xgb_preds = xgb_preds_full[seq_len:]
min_len = min(len(lstm_preds), len(xgb_preds))
lstm_preds = lstm_preds[:min_len]
xgb_preds = xgb_preds[:min_len]
# Average (equal weight)
combined = 0.5 * lstm_preds + 0.5 * xgb_preds
combined = np.clip(combined, 0, 100)
fi = _extract_feature_importances(xgb_model, n_features)
return combined, fi
def _extract_feature_importances(model, n_features):
"""Extract normalized feature importances from a model."""
try:
if hasattr(model, "feature_importances_"):
fi = model.feature_importances_
elif hasattr(model, "get_booster"):
fi_dict = model.get_booster().get_score(importance_type="gain")
fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(n_features)])
else:
return None
return fi / (fi.sum() + 1e-10)
except Exception:
return None
# ---------------------------------------------------------------------------
# Results Compilation
# ---------------------------------------------------------------------------
def compile_results(predictions, per_window_cost_improvement,
fi_sum, fi_count, feature_cols, config):
"""Compile accumulation signal results into output JSON."""
strategy = config.get("strategy", {})
strong_threshold = strategy.get("strong_buy_threshold", 80)
good_threshold = strategy.get("good_buy_threshold", 70)
poor_threshold = strategy.get("poor_threshold", 30)
if not predictions:
return _empty_results(per_window_cost_improvement)
pred_scores = np.array([p["predicted"] for p in predictions])
actual_scores = np.array([p["actual"] for p in predictions])
close_prices = np.array([p["close"] for p in predictions])
total_candles = len(predictions)
# --- Signal Quality: STRONG BUY (score > strong_threshold) ---
strong_buy_mask = pred_scores >= strong_threshold
strong_buy_count = int(np.sum(strong_buy_mask))
if strong_buy_count > 0:
strong_buy_actual = actual_scores[strong_buy_mask]
# Actual scores correspond to forward return quality
avg_actual_strong = float(np.mean(strong_buy_actual))
else:
avg_actual_strong = 0.0
# We need forward return info. Since actual_score is a rank-based measure (0-100),
# and we want to report real forward returns, we approximate:
# actual_score > 80 means the buy was in the top 20% of quality.
# For actual forward return stats, we use actual score as a proxy.
# Profitable signals: those where actual score is also above median (50)
if strong_buy_count > 0:
pct_profitable_strong = float(np.mean(actual_scores[strong_buy_mask] > 50))
else:
pct_profitable_strong = 0.0
# --- Cost Basis Comparison vs DCA ---
dca_avg = float(np.mean(close_prices))
# Model strategy: buy when predicted score >= good_threshold
good_buy_mask = pred_scores >= good_threshold
good_buy_prices = close_prices[good_buy_mask]
good_buy_count = len(good_buy_prices)
if good_buy_count >= 3:
model_avg = float(np.mean(good_buy_prices))
cost_basis_improvement = (dca_avg - model_avg) / dca_avg * 100
else:
model_avg = dca_avg
cost_basis_improvement = 0.0
# --- Signal Frequency ---
signal_frequency = strong_buy_count / total_candles * 100 if total_candles > 0 else 0
# --- Score at actual extremes ---
# "Actual bottoms" = candles with actual score > 85 (top 15% buy opportunities)
actual_bottom_mask = actual_scores > 85
if np.any(actual_bottom_mask):
avg_score_at_bottoms = float(np.mean(pred_scores[actual_bottom_mask]))
else:
avg_score_at_bottoms = 0.0
# "Actual tops" = candles with actual score < 15 (worst 15% buy times)
actual_top_mask = actual_scores < 15
if np.any(actual_top_mask):
avg_score_at_tops = float(np.mean(pred_scores[actual_top_mask]))
else:
avg_score_at_tops = 50.0
# --- Model R2 Score ---
ss_res = np.sum((actual_scores - pred_scores) ** 2)
ss_tot = np.sum((actual_scores - np.mean(actual_scores)) ** 2)
r2 = 1 - ss_res / (ss_tot + 1e-10)
# --- Feature Importances ---
fi_avg = fi_sum / max(fi_count, 1)
fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1])
feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]}
# --- Score Distribution ---
bins = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)]
score_distribution = {}
for lo, hi in bins:
key = f"{lo}-{hi}"
count = int(np.sum((pred_scores >= lo) & (pred_scores < (hi if hi < 100 else 101))))
score_distribution[key] = count
# --- Forward return approximation from actual scores ---
# Map actual score to approximate return quality
# Score 90+ = historically best 10% buys, score 10- = worst 10%
# Use actual score as proxy for "quality rank"
if strong_buy_count > 0:
# Average actual quality score for strong buy signals
avg_quality_strong = float(np.mean(actual_scores[strong_buy_mask]))
# Estimate: top quality signals should be 70+
quality_good = avg_quality_strong > 60
else:
avg_quality_strong = 0.0
quality_good = False
return {
"cost_basis_improvement_pct": round(cost_basis_improvement, 2),
"avg_cost_basis_model": round(model_avg, 2),
"avg_cost_basis_dca": round(dca_avg, 2),
"strong_buy_signal_count": strong_buy_count,
"good_buy_signal_count": good_buy_count,
"total_candles_tested": total_candles,
"signal_frequency_pct": round(signal_frequency, 2),
"pct_quality_strong_buy": round(pct_profitable_strong, 3),
"avg_score_at_actual_bottoms": round(avg_score_at_bottoms, 1),
"avg_score_at_actual_tops": round(avg_score_at_tops, 1),
"model_r2_score": round(float(r2), 4),
"avg_quality_score_strong_buy": round(avg_quality_strong, 1),
"feature_importances": feature_importances,
"per_window_cost_improvement": per_window_cost_improvement,
"score_distribution": score_distribution,
"model_type": config.get("model_type", "unknown"),
}
def _empty_results(per_window):
return {
"cost_basis_improvement_pct": 0.0,
"avg_cost_basis_model": 0.0,
"avg_cost_basis_dca": 0.0,
"strong_buy_signal_count": 0,
"good_buy_signal_count": 0,
"total_candles_tested": 0,
"signal_frequency_pct": 0.0,
"pct_quality_strong_buy": 0.0,
"avg_score_at_actual_bottoms": 0.0,
"avg_score_at_actual_tops": 0.0,
"model_r2_score": 0.0,
"avg_quality_score_strong_buy": 0.0,
"feature_importances": {},
"per_window_cost_improvement": per_window,
"score_distribution": {},
"model_type": "unknown",
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="BTC Accumulation Signal Optimizer")
parser.add_argument("--config", required=True, help="Path to config JSON")
parser.add_argument("--data", required=True, help="Path to OHLCV CSV")
parser.add_argument("--output", required=True, help="Path to output results JSON")
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
print(f"Loading data from {args.data}...")
df = pd.read_csv(args.data, parse_dates=["timestamp"])
print(f" {len(df)} rows, {df['timestamp'].iloc[0]} -> {df['timestamp'].iloc[-1]}")
print("Computing accumulation features...")
df = compute_features(df, config)
print("Creating accumulation score targets...")
df["target"] = create_accumulation_target(df, config)
# Drop NaN rows
feature_cols = [c for c in df.columns
if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]]
df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True)
print(f" {len(df)} rows after cleanup, {len(feature_cols)} features")
target_stats = df["target"].describe()
print(f" Target stats: mean={target_stats['mean']:.1f}, std={target_stats['std']:.1f}, "
f"min={target_stats['min']:.1f}, max={target_stats['max']:.1f}")
model_type = config.get("model_type", "xgboost")
rolling = config.get("training", {}).get("rolling_window", True)
print(f"\nModel: {model_type}, Rolling: {rolling}")
print("Running accumulation signal optimization...")
results = walk_forward_train_test(df, feature_cols, config)
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {args.output}")
print(f" Cost Basis Improvement: {results['cost_basis_improvement_pct']:.1f}%")
print(f" Strong Buy Signals: {results['strong_buy_signal_count']}")
print(f" Signal Frequency: {results['signal_frequency_pct']:.1f}%")
print(f" Model R2: {results['model_r2_score']:.4f}")
print(f" Score at Bottoms: {results['avg_score_at_actual_bottoms']:.1f}")
print(f" Score at Tops: {results['avg_score_at_actual_tops']:.1f}")
if __name__ == "__main__":
main()