Replace day-trading bot with long-term accumulation signal model. Predicts optimal BUY times using forward return analysis at 7d/30d/90d horizons, scoring each candle 0-100. Primary metric is now cost_basis_improvement_pct (model buy price vs DCA). - train_and_backtest.py: regression models (XGBoost/LSTM hybrid), accumulation-focused features (price position, momentum, volatility, volume, cycle), forward return targets, signal quality backtesting - orchestrator.py: cost improvement scoring, signal count validation - analyzer.py: accumulation-focused LLM system prompt - dashboard: cost improvement display, signal metrics table - config: new accumulation-focused parameters Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1043 lines
39 KiB
Python
Executable File
1043 lines
39 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BTC Accumulation Signal Optimizer -- Train & Backtest Engine
|
|
Self-contained script that runs on the Windows PC with GPU.
|
|
|
|
Predicts the best times to BUY BTC for long-term holding by scoring
|
|
each candle with an Accumulation Score (0-100).
|
|
|
|
Usage:
|
|
python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import warnings
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
import ta
|
|
from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator
|
|
from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator
|
|
from ta.volatility import BollingerBands, AverageTrueRange
|
|
from ta.volume import OnBalanceVolumeIndicator
|
|
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.decomposition import PCA
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature Engineering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compute_features(df, config):
|
|
"""Compute accumulation-focused features from OHLCV data."""
|
|
feat = config.get("features", {})
|
|
c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"]
|
|
|
|
# --- Price Position ---
|
|
if feat.get("use_price_position", True):
|
|
# Distance from ATH (all-time high in dataset) as %
|
|
rolling_ath = c.expanding().max()
|
|
df["dist_from_ath_pct"] = (c - rolling_ath) / rolling_ath * 100
|
|
|
|
# Distance from 52-week high/low (using ~2190 4h candles = 365 days)
|
|
period_52w = min(2190, len(df) - 1)
|
|
if period_52w > 50:
|
|
rolling_52w_high = h.rolling(period_52w, min_periods=50).max()
|
|
rolling_52w_low = l.rolling(period_52w, min_periods=50).min()
|
|
df["dist_from_52w_high_pct"] = (c - rolling_52w_high) / rolling_52w_high * 100
|
|
df["dist_from_52w_low_pct"] = (c - rolling_52w_low) / rolling_52w_low * 100
|
|
|
|
# Price vs SMA(50) and SMA(200)
|
|
sma50 = SMAIndicator(c, window=50).sma_indicator()
|
|
sma200 = SMAIndicator(c, window=200).sma_indicator()
|
|
df["SMA_50"] = sma50
|
|
df["SMA_200"] = sma200
|
|
df["price_vs_sma50_pct"] = (c - sma50) / sma50 * 100
|
|
df["price_vs_sma200_pct"] = (c - sma200) / sma200 * 100
|
|
df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100 # golden/death cross
|
|
|
|
# Price percentile over last 365 candles (~2190 for 4h, ~8760 for 1h)
|
|
period_365 = min(2190, len(df) - 1)
|
|
if period_365 > 50:
|
|
df["price_percentile_365"] = c.rolling(period_365, min_periods=50).apply(
|
|
lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False
|
|
)
|
|
|
|
# Additional SMAs
|
|
for p in [10, 20]:
|
|
sma = SMAIndicator(c, window=p).sma_indicator()
|
|
df[f"price_vs_sma{p}_pct"] = (c - sma) / sma * 100
|
|
|
|
# --- Momentum / Oversold ---
|
|
if feat.get("use_momentum", True):
|
|
df["RSI_14"] = RSIIndicator(c, window=14).rsi()
|
|
df["RSI_7"] = RSIIndicator(c, window=7).rsi()
|
|
|
|
macd = MACD(c, window_slow=26, window_fast=12, window_sign=9)
|
|
df["MACD_line"] = macd.macd()
|
|
df["MACD_signal"] = macd.macd_signal()
|
|
df["MACD_hist"] = macd.macd_diff()
|
|
|
|
stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3)
|
|
df["stoch_k"] = stoch.stoch()
|
|
df["stoch_d"] = stoch.stoch_signal()
|
|
|
|
df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r()
|
|
|
|
df["ROC_30"] = ROCIndicator(c, window=30).roc()
|
|
df["ROC_90"] = ROCIndicator(c, window=90).roc()
|
|
|
|
# --- Volatility / Fear ---
|
|
if feat.get("use_volatility", True):
|
|
bb = BollingerBands(c, window=20, window_dev=2)
|
|
df["BB_width"] = (bb.bollinger_hband() - bb.bollinger_lband()) / c
|
|
df["BB_pctb"] = bb.bollinger_pband()
|
|
df["price_vs_lower_bb"] = (c - bb.bollinger_lband()) / c
|
|
|
|
atr = AverageTrueRange(h, l, c, window=14)
|
|
df["ATR_14"] = atr.average_true_range()
|
|
df["ATR_pct"] = df["ATR_14"] / c * 100
|
|
|
|
# Consecutive red candles
|
|
is_red = (c < o).astype(int)
|
|
df["consecutive_red"] = is_red.groupby(
|
|
(is_red != is_red.shift()).cumsum()
|
|
).cumsum() * is_red
|
|
|
|
# Max drawdown over last 30 candles
|
|
if len(df) > 30:
|
|
rolling_max_30 = c.rolling(30, min_periods=1).max()
|
|
df["drawdown_30"] = (c - rolling_max_30) / rolling_max_30 * 100
|
|
|
|
# Historical volatility
|
|
df["hist_volatility_20"] = c.pct_change().rolling(20).std() * np.sqrt(252)
|
|
|
|
# --- Volume ---
|
|
if feat.get("use_volume", True):
|
|
df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume()
|
|
vol_sma20 = v.rolling(20).mean()
|
|
df["volume_ratio"] = v / vol_sma20
|
|
|
|
# Volume on red vs green candles ratio (rolling 20)
|
|
is_green = (c >= o).astype(float)
|
|
is_red_f = (c < o).astype(float)
|
|
green_vol = (v * is_green).rolling(20).sum()
|
|
red_vol = (v * is_red_f).rolling(20).sum()
|
|
df["red_green_vol_ratio"] = red_vol / (green_vol + 1e-10)
|
|
|
|
# OBV trend (slope over 20 candles)
|
|
df["OBV_slope"] = df["OBV"].diff(20) / (df["OBV"].rolling(20).mean().abs() + 1e-10)
|
|
|
|
# --- Cycle ---
|
|
if feat.get("use_cycle", True):
|
|
# MA(50) vs MA(200) position (bull/bear regime) -- already computed if price_position
|
|
if "sma50_vs_sma200" not in df.columns:
|
|
sma50 = SMAIndicator(c, window=50).sma_indicator()
|
|
sma200 = SMAIndicator(c, window=200).sma_indicator()
|
|
df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100
|
|
|
|
# Days since last major drawdown (>20% from peak)
|
|
rolling_peak = c.expanding().max()
|
|
drawdown_from_peak = (c - rolling_peak) / rolling_peak
|
|
major_dd = (drawdown_from_peak < -0.20).astype(int)
|
|
# Count candles since last major drawdown
|
|
dd_groups = major_dd.groupby((major_dd != major_dd.shift()).cumsum())
|
|
df["candles_since_major_dd"] = dd_groups.cumcount()
|
|
# Reset to 0 at drawdown points, count up otherwise
|
|
df.loc[major_dd == 1, "candles_since_major_dd"] = 0
|
|
|
|
return df
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Target: Accumulation Score
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_accumulation_target(df, config):
|
|
"""Create accumulation score target based on forward returns.
|
|
|
|
For each candle, compute actual forward returns at multiple horizons,
|
|
rank them, and create a weighted accumulation score (0-100).
|
|
Times when buying led to the best long-term returns get highest scores.
|
|
"""
|
|
tgt = config.get("target", {})
|
|
timeframe = config.get("timeframe", "4h")
|
|
|
|
if timeframe == "1h":
|
|
forward_periods = tgt.get("forward_periods_1h", [168, 720, 2160])
|
|
else:
|
|
forward_periods = tgt.get("forward_periods_4h", [42, 180, 540])
|
|
|
|
weights = tgt.get("weights", [0.2, 0.3, 0.5])
|
|
|
|
# Ensure weights sum to 1
|
|
w_sum = sum(weights)
|
|
weights = [w / w_sum for w in weights]
|
|
|
|
close = df["close"].values
|
|
n = len(close)
|
|
|
|
# Compute forward returns for each horizon
|
|
forward_returns = []
|
|
for period in forward_periods:
|
|
fwd = np.full(n, np.nan)
|
|
for i in range(n - period):
|
|
fwd[i] = (close[i + period] - close[i]) / close[i] * 100
|
|
forward_returns.append(fwd)
|
|
|
|
# Rank each forward return (percentile rank, 0-1)
|
|
# Higher rank = better buy point (higher future return)
|
|
ranked = []
|
|
for fwd in forward_returns:
|
|
valid_mask = ~np.isnan(fwd)
|
|
ranks = np.full(n, np.nan)
|
|
valid_vals = fwd[valid_mask]
|
|
if len(valid_vals) > 0:
|
|
from scipy.stats import rankdata
|
|
r = rankdata(valid_vals, method="average") / len(valid_vals)
|
|
ranks[valid_mask] = r
|
|
ranked.append(ranks)
|
|
|
|
# Weighted combination of ranks -> accumulation score (0-100)
|
|
score = np.zeros(n)
|
|
valid = np.ones(n, dtype=bool)
|
|
for r, w in zip(ranked, weights):
|
|
nan_mask = np.isnan(r)
|
|
valid &= ~nan_mask
|
|
r_filled = np.where(nan_mask, 0, r)
|
|
score += w * r_filled
|
|
|
|
# Scale to 0-100
|
|
score = score * 100
|
|
score[~valid] = np.nan
|
|
|
|
return pd.Series(score, index=df.index, name="target")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LSTM Regressor (PyTorch)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_device():
|
|
"""Detect best available device for PyTorch."""
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
return torch.device("cuda")
|
|
return torch.device("cpu")
|
|
|
|
|
|
class LSTMRegressor:
|
|
"""PyTorch LSTM for regression of accumulation scores."""
|
|
|
|
def __init__(self, input_size, hp):
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
self.hp = hp
|
|
self.device = get_device()
|
|
self.sequence_length = hp.get("lstm_sequence_length", 30)
|
|
|
|
hidden_size = hp.get("lstm_hidden_size", 128)
|
|
num_layers = hp.get("lstm_num_layers", 2)
|
|
dropout = hp.get("lstm_dropout", 0.3)
|
|
|
|
class _LSTMNet(nn.Module):
|
|
def __init__(self_net):
|
|
super().__init__()
|
|
self_net.lstm = nn.LSTM(
|
|
input_size=input_size,
|
|
hidden_size=hidden_size,
|
|
num_layers=num_layers,
|
|
batch_first=True,
|
|
dropout=dropout if num_layers > 1 else 0.0,
|
|
)
|
|
self_net.dropout = nn.Dropout(dropout)
|
|
self_net.fc = nn.Linear(hidden_size, 1)
|
|
|
|
def forward(self_net, x):
|
|
lstm_out, _ = self_net.lstm(x)
|
|
last_hidden = lstm_out[:, -1, :]
|
|
out = self_net.dropout(last_hidden)
|
|
out = self_net.fc(out)
|
|
return out.squeeze(-1)
|
|
|
|
self.model = _LSTMNet().to(self.device)
|
|
self.feature_importances_ = None
|
|
|
|
def _make_sequences(self, X, y=None):
|
|
"""Convert flat feature arrays into overlapping sequences."""
|
|
import torch
|
|
seq_len = self.sequence_length
|
|
sequences = []
|
|
targets = []
|
|
for i in range(seq_len, len(X)):
|
|
sequences.append(X[i - seq_len:i])
|
|
if y is not None:
|
|
targets.append(y[i])
|
|
X_seq = torch.FloatTensor(np.array(sequences)).to(self.device)
|
|
if y is not None:
|
|
y_seq = torch.FloatTensor(np.array(targets)).to(self.device)
|
|
return X_seq, y_seq
|
|
return X_seq
|
|
|
|
def fit(self, X_train, y_train, X_val=None, y_val=None):
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
lr = self.hp.get("learning_rate", 0.001)
|
|
epochs = self.hp.get("lstm_epochs", 100)
|
|
batch_size = self.hp.get("lstm_batch_size", 64)
|
|
patience = self.hp.get("lstm_patience", 10)
|
|
|
|
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
|
|
criterion = nn.MSELoss()
|
|
|
|
X_seq, y_seq = self._make_sequences(X_train, y_train)
|
|
if X_val is not None and y_val is not None:
|
|
X_val_seq, y_val_seq = self._make_sequences(X_val, y_val)
|
|
has_val = len(X_val_seq) > 0
|
|
else:
|
|
has_val = False
|
|
|
|
best_val_loss = float("inf")
|
|
patience_counter = 0
|
|
best_state = None
|
|
|
|
self.model.train()
|
|
n_samples = len(X_seq)
|
|
|
|
for epoch in range(epochs):
|
|
perm = torch.randperm(n_samples)
|
|
X_seq_shuffled = X_seq[perm]
|
|
y_seq_shuffled = y_seq[perm]
|
|
|
|
epoch_loss = 0.0
|
|
n_batches = 0
|
|
for start in range(0, n_samples, batch_size):
|
|
end = min(start + batch_size, n_samples)
|
|
X_batch = X_seq_shuffled[start:end]
|
|
y_batch = y_seq_shuffled[start:end]
|
|
|
|
optimizer.zero_grad()
|
|
preds = self.model(X_batch)
|
|
loss = criterion(preds, y_batch)
|
|
loss.backward()
|
|
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
|
|
optimizer.step()
|
|
epoch_loss += loss.item()
|
|
n_batches += 1
|
|
|
|
avg_loss = epoch_loss / max(n_batches, 1)
|
|
|
|
if has_val:
|
|
self.model.eval()
|
|
with torch.no_grad():
|
|
val_preds = self.model(X_val_seq)
|
|
val_loss = criterion(val_preds, y_val_seq).item()
|
|
self.model.train()
|
|
|
|
if val_loss < best_val_loss:
|
|
best_val_loss = val_loss
|
|
patience_counter = 0
|
|
best_state = {k: v.clone() for k, v in self.model.state_dict().items()}
|
|
else:
|
|
patience_counter += 1
|
|
if patience_counter >= patience:
|
|
print(f" LSTM early stop at epoch {epoch+1}, val_loss={val_loss:.4f}")
|
|
break
|
|
|
|
if (epoch + 1) % 20 == 0:
|
|
print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}, val_loss={val_loss:.4f}")
|
|
else:
|
|
if (epoch + 1) % 20 == 0:
|
|
print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}")
|
|
|
|
if best_state is not None:
|
|
self.model.load_state_dict(best_state)
|
|
|
|
def predict(self, X):
|
|
import torch
|
|
self.model.eval()
|
|
X_seq = self._make_sequences(X)
|
|
with torch.no_grad():
|
|
preds = self.model(X_seq).cpu().numpy()
|
|
# Clamp to 0-100
|
|
return np.clip(preds, 0, 100)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Model Building
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_model(config, input_size=0):
|
|
"""Build regression model based on config."""
|
|
model_type = config.get("model_type", "xgboost")
|
|
hp = config.get("hyperparameters", {})
|
|
|
|
if model_type == "lstm":
|
|
return LSTMRegressor(input_size, hp)
|
|
|
|
if model_type == "xgboost":
|
|
import xgboost as xgb
|
|
try:
|
|
import torch
|
|
gpu_available = torch.cuda.is_available()
|
|
except ImportError:
|
|
gpu_available = False
|
|
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.01),
|
|
"max_depth": hp.get("max_depth", 5),
|
|
"n_estimators": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"colsample_bytree": hp.get("colsample_bytree", 0.8),
|
|
"min_child_weight": hp.get("min_child_weight", 10),
|
|
"gamma": hp.get("gamma", 0.3),
|
|
"reg_alpha": hp.get("reg_alpha", 0.5),
|
|
"reg_lambda": hp.get("reg_lambda", 3.0),
|
|
"objective": "reg:squarederror",
|
|
"eval_metric": "rmse",
|
|
"random_state": 42,
|
|
"device": "cuda" if gpu_available else "cpu",
|
|
"verbosity": 0,
|
|
}
|
|
return xgb.XGBRegressor(**params)
|
|
|
|
elif model_type == "lightgbm":
|
|
import lightgbm as lgb
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.01),
|
|
"max_depth": hp.get("max_depth", 5),
|
|
"n_estimators": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"colsample_bytree": hp.get("colsample_bytree", 0.8),
|
|
"min_child_samples": hp.get("min_child_weight", 10),
|
|
"reg_alpha": hp.get("reg_alpha", 0.5),
|
|
"reg_lambda": hp.get("reg_lambda", 3.0),
|
|
"objective": "regression",
|
|
"metric": "rmse",
|
|
"random_state": 42,
|
|
"verbose": -1,
|
|
}
|
|
try:
|
|
params["device"] = "gpu"
|
|
return lgb.LGBMRegressor(**params)
|
|
except Exception:
|
|
params["device"] = "cpu"
|
|
return lgb.LGBMRegressor(**params)
|
|
|
|
elif model_type == "catboost":
|
|
from catboost import CatBoostRegressor
|
|
try:
|
|
import torch
|
|
gpu_available = torch.cuda.is_available()
|
|
except ImportError:
|
|
gpu_available = False
|
|
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.01),
|
|
"depth": hp.get("max_depth", 5),
|
|
"iterations": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"l2_leaf_reg": hp.get("reg_lambda", 3.0),
|
|
"loss_function": "RMSE",
|
|
"random_seed": 42,
|
|
"verbose": 0,
|
|
"task_type": "GPU" if gpu_available else "CPU",
|
|
}
|
|
return CatBoostRegressor(**params)
|
|
|
|
elif model_type == "ensemble":
|
|
raise ValueError("Use 'hybrid' instead of 'ensemble' for accumulation mode")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown model_type: {model_type}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scaling & PCA
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def apply_scaling_pca(X_train, X_val, X_test, config):
|
|
"""Apply StandardScaler and optional PCA."""
|
|
feat_cfg = config.get("features", {})
|
|
scaler = None
|
|
pca = None
|
|
|
|
if feat_cfg.get("use_scaler", True):
|
|
scaler = StandardScaler()
|
|
X_train = scaler.fit_transform(X_train)
|
|
if X_val is not None:
|
|
X_val = scaler.transform(X_val)
|
|
X_test = scaler.transform(X_test)
|
|
|
|
if feat_cfg.get("use_pca", False):
|
|
variance = feat_cfg.get("pca_variance", 0.95)
|
|
pca = PCA(n_components=variance, svd_solver="full")
|
|
X_train = pca.fit_transform(X_train)
|
|
if X_val is not None:
|
|
X_val = pca.transform(X_val)
|
|
X_test = pca.transform(X_test)
|
|
print(f" PCA: {pca.n_components_} components (retaining {variance*100:.0f}% variance)")
|
|
|
|
return X_train, X_val, X_test, scaler, pca
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rolling Window Validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def rolling_window_train_test(df, feature_cols, config):
|
|
"""Rolling window train/test for accumulation score prediction."""
|
|
training_cfg = config.get("training", {})
|
|
train_size = training_cfg.get("rolling_train_size", 2500)
|
|
test_size = training_cfg.get("rolling_test_size", 300)
|
|
val_pct = training_cfg.get("validation_pct", 0.15)
|
|
model_type = config.get("model_type", "xgboost")
|
|
|
|
n = len(df)
|
|
all_predictions = [] # list of (predicted_score, actual_score, close_price)
|
|
per_window_results = []
|
|
feature_importances_sum = None
|
|
fi_count = 0
|
|
effective_n_features = len(feature_cols)
|
|
window_count = 0
|
|
last_pca = None
|
|
|
|
start = 0
|
|
while start + train_size + test_size <= n:
|
|
train_end = start + train_size
|
|
test_end = min(train_end + test_size, n)
|
|
|
|
train_full = df.iloc[start:train_end]
|
|
test_df = df.iloc[train_end:test_end]
|
|
|
|
if len(test_df) < 10:
|
|
start += test_size
|
|
continue
|
|
|
|
# Check target has variance
|
|
train_target = train_full["target"].dropna()
|
|
if len(train_target) < 100 or train_target.std() < 1.0:
|
|
start += test_size
|
|
continue
|
|
|
|
# Split train into train/val
|
|
val_split = int(len(train_full) * (1.0 - val_pct))
|
|
train_df = train_full.iloc[:val_split]
|
|
val_df = train_full.iloc[val_split:]
|
|
|
|
X_train = train_df[feature_cols].values
|
|
y_train = train_df["target"].values
|
|
X_val = val_df[feature_cols].values
|
|
y_val = val_df["target"].values
|
|
X_test = test_df[feature_cols].values
|
|
y_test = test_df["target"].values
|
|
|
|
# Scale and PCA
|
|
X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config)
|
|
last_pca = pca
|
|
current_n_features = X_train.shape[1]
|
|
if feature_importances_sum is None:
|
|
effective_n_features = current_n_features
|
|
feature_importances_sum = np.zeros(effective_n_features)
|
|
|
|
window_count += 1
|
|
total_possible = (n - train_size) // test_size
|
|
|
|
# Train and predict
|
|
preds, fi = _train_and_predict_window(
|
|
X_train, y_train, X_val, y_val, X_test,
|
|
config, current_n_features, window_count - 1, total_possible
|
|
)
|
|
|
|
if fi is not None and len(fi) == effective_n_features:
|
|
feature_importances_sum += fi
|
|
fi_count += 1
|
|
|
|
# Align predictions with test data
|
|
if model_type in ("lstm", "hybrid"):
|
|
seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30)
|
|
offset = seq_len if model_type == "lstm" else seq_len
|
|
if len(preds) < len(test_df):
|
|
test_aligned = test_df.iloc[offset:offset + len(preds)]
|
|
else:
|
|
test_aligned = test_df.iloc[:len(preds)]
|
|
else:
|
|
test_aligned = test_df.iloc[:len(preds)]
|
|
|
|
min_len = min(len(preds), len(test_aligned))
|
|
preds = preds[:min_len]
|
|
test_aligned = test_aligned.iloc[:min_len]
|
|
|
|
for pred_score, actual_score, close_price in zip(
|
|
preds, test_aligned["target"].values, test_aligned["close"].values
|
|
):
|
|
if not np.isnan(actual_score):
|
|
all_predictions.append({
|
|
"predicted": float(pred_score),
|
|
"actual": float(actual_score),
|
|
"close": float(close_price),
|
|
})
|
|
|
|
# Per-window cost improvement
|
|
window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values)
|
|
if not np.isnan(p[1])]
|
|
if window_preds:
|
|
wp_pred = [p[0] for p in window_preds]
|
|
wp_close = [p[2] for p in window_preds]
|
|
dca_avg = np.mean(wp_close)
|
|
# Model buys: only when predicted score > strong_buy_threshold
|
|
threshold = config.get("strategy", {}).get("strong_buy_threshold", 80)
|
|
buy_prices = [close for pred, _, close in window_preds if pred >= threshold]
|
|
if len(buy_prices) >= 3:
|
|
model_avg = np.mean(buy_prices)
|
|
improvement = (dca_avg - model_avg) / dca_avg * 100
|
|
else:
|
|
improvement = 0.0
|
|
per_window_results.append(round(improvement, 1))
|
|
print(f" Window {window_count}: {len(buy_prices)} signals, cost improvement={improvement:.1f}%")
|
|
else:
|
|
per_window_results.append(0.0)
|
|
print(f" Window {window_count}: no valid predictions")
|
|
|
|
start += test_size
|
|
|
|
# Build feature names
|
|
if last_pca is not None and config.get("features", {}).get("use_pca", False):
|
|
effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)]
|
|
else:
|
|
effective_feature_names = feature_cols
|
|
|
|
if feature_importances_sum is None:
|
|
feature_importances_sum = np.zeros(len(effective_feature_names))
|
|
|
|
return compile_results(all_predictions, per_window_results,
|
|
feature_importances_sum, fi_count,
|
|
effective_feature_names, config)
|
|
|
|
|
|
def walk_forward_train_test(df, feature_cols, config):
|
|
"""Walk-forward or rolling window validation."""
|
|
training_cfg = config.get("training", {})
|
|
if training_cfg.get("rolling_window", True):
|
|
return rolling_window_train_test(df, feature_cols, config)
|
|
|
|
# Static walk-forward: split into N windows
|
|
n_windows = training_cfg.get("walk_forward_windows", 5)
|
|
train_pct = training_cfg.get("train_pct", 0.7)
|
|
val_pct = training_cfg.get("validation_pct", 0.15)
|
|
|
|
n = len(df)
|
|
window_size = n // n_windows
|
|
|
|
all_predictions = []
|
|
per_window_results = []
|
|
feature_importances_sum = None
|
|
fi_count = 0
|
|
effective_n_features = len(feature_cols)
|
|
last_pca = None
|
|
|
|
for w in range(n_windows):
|
|
w_start = w * window_size
|
|
w_end = min((w + 1) * window_size + int(window_size * 0.3), n)
|
|
if w_end > n:
|
|
w_end = n
|
|
|
|
window_data = df.iloc[w_start:w_end].copy()
|
|
wn = len(window_data)
|
|
|
|
train_end = int(wn * train_pct)
|
|
val_end = int(wn * (train_pct + val_pct))
|
|
|
|
train_df = window_data.iloc[:train_end]
|
|
val_df = window_data.iloc[train_end:val_end]
|
|
test_df = window_data.iloc[val_end:]
|
|
|
|
if len(test_df) < 10:
|
|
continue
|
|
|
|
train_target = train_df["target"].dropna()
|
|
if len(train_target) < 100 or train_target.std() < 1.0:
|
|
continue
|
|
|
|
X_train = train_df[feature_cols].values
|
|
y_train = train_df["target"].values
|
|
X_val = val_df[feature_cols].values
|
|
y_val = val_df["target"].values
|
|
X_test = test_df[feature_cols].values
|
|
|
|
X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config)
|
|
last_pca = pca
|
|
current_n_features = X_train.shape[1]
|
|
if feature_importances_sum is None:
|
|
effective_n_features = current_n_features
|
|
feature_importances_sum = np.zeros(effective_n_features)
|
|
|
|
preds, fi = _train_and_predict_window(
|
|
X_train, y_train, X_val, y_val, X_test,
|
|
config, current_n_features, w, n_windows
|
|
)
|
|
|
|
if fi is not None and len(fi) == effective_n_features:
|
|
feature_importances_sum += fi
|
|
fi_count += 1
|
|
|
|
model_type = config.get("model_type", "xgboost")
|
|
if model_type in ("lstm", "hybrid"):
|
|
seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30)
|
|
test_aligned = test_df.iloc[seq_len:seq_len + len(preds)]
|
|
else:
|
|
test_aligned = test_df.iloc[:len(preds)]
|
|
|
|
min_len = min(len(preds), len(test_aligned))
|
|
preds = preds[:min_len]
|
|
test_aligned = test_aligned.iloc[:min_len]
|
|
|
|
for pred_score, actual_score, close_price in zip(
|
|
preds, test_aligned["target"].values, test_aligned["close"].values
|
|
):
|
|
if not np.isnan(actual_score):
|
|
all_predictions.append({
|
|
"predicted": float(pred_score),
|
|
"actual": float(actual_score),
|
|
"close": float(close_price),
|
|
})
|
|
|
|
# Per-window metrics
|
|
window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values)
|
|
if not np.isnan(p[1])]
|
|
if window_preds:
|
|
threshold = config.get("strategy", {}).get("strong_buy_threshold", 80)
|
|
wp_close = [p[2] for p in window_preds]
|
|
buy_prices = [close for pred, _, close in window_preds if pred >= threshold]
|
|
dca_avg = np.mean(wp_close)
|
|
if len(buy_prices) >= 3:
|
|
improvement = (dca_avg - np.mean(buy_prices)) / dca_avg * 100
|
|
else:
|
|
improvement = 0.0
|
|
per_window_results.append(round(improvement, 1))
|
|
print(f" Window {w+1}/{n_windows}: {len(buy_prices)} signals, improvement={improvement:.1f}%")
|
|
else:
|
|
per_window_results.append(0.0)
|
|
|
|
if last_pca is not None and config.get("features", {}).get("use_pca", False):
|
|
effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)]
|
|
else:
|
|
effective_feature_names = feature_cols
|
|
|
|
if feature_importances_sum is None:
|
|
feature_importances_sum = np.zeros(len(effective_feature_names))
|
|
|
|
return compile_results(all_predictions, per_window_results,
|
|
feature_importances_sum, fi_count,
|
|
effective_feature_names, config)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Train & Predict Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _train_and_predict_window(X_train, y_train, X_val, y_val, X_test,
|
|
config, n_features, w_idx, n_windows):
|
|
"""Train model and return predictions + feature importances."""
|
|
model_type = config.get("model_type", "xgboost")
|
|
|
|
if model_type == "hybrid":
|
|
return _hybrid_train_and_predict(
|
|
X_train, y_train, X_val, y_val, X_test,
|
|
config, n_features
|
|
)
|
|
|
|
if model_type == "lstm":
|
|
model = build_model(config, input_size=n_features)
|
|
else:
|
|
model = build_model(config)
|
|
|
|
try:
|
|
if model_type == "lstm":
|
|
model.fit(X_train, y_train, X_val, y_val)
|
|
else:
|
|
model.fit(X_train, y_train)
|
|
except Exception as e:
|
|
print(f" Window {w_idx+1}: training failed -- {e}", file=sys.stderr)
|
|
return np.array([]), None
|
|
|
|
try:
|
|
if model_type == "lstm":
|
|
preds = model.predict(X_test)
|
|
else:
|
|
preds = model.predict(X_test)
|
|
preds = np.clip(preds, 0, 100)
|
|
except Exception as e:
|
|
print(f" Window {w_idx+1}: prediction failed -- {e}", file=sys.stderr)
|
|
return np.array([]), None
|
|
|
|
fi = _extract_feature_importances(model, n_features)
|
|
return preds, fi
|
|
|
|
|
|
def _hybrid_train_and_predict(X_train, y_train, X_val, y_val, X_test,
|
|
config, n_features):
|
|
"""Hybrid: average of LSTM + XGBoost regression predictions."""
|
|
hp = config.get("hyperparameters", {})
|
|
seq_len = hp.get("lstm_sequence_length", 30)
|
|
|
|
# Train LSTM
|
|
lstm_config = {**config, "model_type": "lstm"}
|
|
lstm_model = build_model(lstm_config, input_size=n_features)
|
|
try:
|
|
lstm_model.fit(X_train, y_train, X_val, y_val)
|
|
lstm_preds = lstm_model.predict(X_test)
|
|
except Exception as e:
|
|
print(f" Hybrid LSTM failed: {e}", file=sys.stderr)
|
|
lstm_preds = np.full(max(0, len(X_test) - seq_len), 50.0)
|
|
|
|
# Train XGBoost
|
|
xgb_config = {**config, "model_type": "xgboost"}
|
|
xgb_model = build_model(xgb_config)
|
|
try:
|
|
xgb_model.fit(X_train, y_train)
|
|
xgb_preds_full = xgb_model.predict(X_test)
|
|
xgb_preds_full = np.clip(xgb_preds_full, 0, 100)
|
|
except Exception as e:
|
|
print(f" Hybrid XGBoost failed: {e}", file=sys.stderr)
|
|
xgb_preds_full = np.full(len(X_test), 50.0)
|
|
|
|
# Align: LSTM output is shorter by seq_len
|
|
xgb_preds = xgb_preds_full[seq_len:]
|
|
min_len = min(len(lstm_preds), len(xgb_preds))
|
|
lstm_preds = lstm_preds[:min_len]
|
|
xgb_preds = xgb_preds[:min_len]
|
|
|
|
# Average (equal weight)
|
|
combined = 0.5 * lstm_preds + 0.5 * xgb_preds
|
|
combined = np.clip(combined, 0, 100)
|
|
|
|
fi = _extract_feature_importances(xgb_model, n_features)
|
|
return combined, fi
|
|
|
|
|
|
def _extract_feature_importances(model, n_features):
|
|
"""Extract normalized feature importances from a model."""
|
|
try:
|
|
if hasattr(model, "feature_importances_"):
|
|
fi = model.feature_importances_
|
|
elif hasattr(model, "get_booster"):
|
|
fi_dict = model.get_booster().get_score(importance_type="gain")
|
|
fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(n_features)])
|
|
else:
|
|
return None
|
|
return fi / (fi.sum() + 1e-10)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Results Compilation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compile_results(predictions, per_window_cost_improvement,
|
|
fi_sum, fi_count, feature_cols, config):
|
|
"""Compile accumulation signal results into output JSON."""
|
|
strategy = config.get("strategy", {})
|
|
strong_threshold = strategy.get("strong_buy_threshold", 80)
|
|
good_threshold = strategy.get("good_buy_threshold", 70)
|
|
poor_threshold = strategy.get("poor_threshold", 30)
|
|
|
|
if not predictions:
|
|
return _empty_results(per_window_cost_improvement)
|
|
|
|
pred_scores = np.array([p["predicted"] for p in predictions])
|
|
actual_scores = np.array([p["actual"] for p in predictions])
|
|
close_prices = np.array([p["close"] for p in predictions])
|
|
|
|
total_candles = len(predictions)
|
|
|
|
# --- Signal Quality: STRONG BUY (score > strong_threshold) ---
|
|
strong_buy_mask = pred_scores >= strong_threshold
|
|
strong_buy_count = int(np.sum(strong_buy_mask))
|
|
|
|
if strong_buy_count > 0:
|
|
strong_buy_actual = actual_scores[strong_buy_mask]
|
|
# Actual scores correspond to forward return quality
|
|
avg_actual_strong = float(np.mean(strong_buy_actual))
|
|
else:
|
|
avg_actual_strong = 0.0
|
|
|
|
# We need forward return info. Since actual_score is a rank-based measure (0-100),
|
|
# and we want to report real forward returns, we approximate:
|
|
# actual_score > 80 means the buy was in the top 20% of quality.
|
|
# For actual forward return stats, we use actual score as a proxy.
|
|
|
|
# Profitable signals: those where actual score is also above median (50)
|
|
if strong_buy_count > 0:
|
|
pct_profitable_strong = float(np.mean(actual_scores[strong_buy_mask] > 50))
|
|
else:
|
|
pct_profitable_strong = 0.0
|
|
|
|
# --- Cost Basis Comparison vs DCA ---
|
|
dca_avg = float(np.mean(close_prices))
|
|
|
|
# Model strategy: buy when predicted score >= good_threshold
|
|
good_buy_mask = pred_scores >= good_threshold
|
|
good_buy_prices = close_prices[good_buy_mask]
|
|
good_buy_count = len(good_buy_prices)
|
|
|
|
if good_buy_count >= 3:
|
|
model_avg = float(np.mean(good_buy_prices))
|
|
cost_basis_improvement = (dca_avg - model_avg) / dca_avg * 100
|
|
else:
|
|
model_avg = dca_avg
|
|
cost_basis_improvement = 0.0
|
|
|
|
# --- Signal Frequency ---
|
|
signal_frequency = strong_buy_count / total_candles * 100 if total_candles > 0 else 0
|
|
|
|
# --- Score at actual extremes ---
|
|
# "Actual bottoms" = candles with actual score > 85 (top 15% buy opportunities)
|
|
actual_bottom_mask = actual_scores > 85
|
|
if np.any(actual_bottom_mask):
|
|
avg_score_at_bottoms = float(np.mean(pred_scores[actual_bottom_mask]))
|
|
else:
|
|
avg_score_at_bottoms = 0.0
|
|
|
|
# "Actual tops" = candles with actual score < 15 (worst 15% buy times)
|
|
actual_top_mask = actual_scores < 15
|
|
if np.any(actual_top_mask):
|
|
avg_score_at_tops = float(np.mean(pred_scores[actual_top_mask]))
|
|
else:
|
|
avg_score_at_tops = 50.0
|
|
|
|
# --- Model R2 Score ---
|
|
ss_res = np.sum((actual_scores - pred_scores) ** 2)
|
|
ss_tot = np.sum((actual_scores - np.mean(actual_scores)) ** 2)
|
|
r2 = 1 - ss_res / (ss_tot + 1e-10)
|
|
|
|
# --- Feature Importances ---
|
|
fi_avg = fi_sum / max(fi_count, 1)
|
|
fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1])
|
|
feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]}
|
|
|
|
# --- Score Distribution ---
|
|
bins = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)]
|
|
score_distribution = {}
|
|
for lo, hi in bins:
|
|
key = f"{lo}-{hi}"
|
|
count = int(np.sum((pred_scores >= lo) & (pred_scores < (hi if hi < 100 else 101))))
|
|
score_distribution[key] = count
|
|
|
|
# --- Forward return approximation from actual scores ---
|
|
# Map actual score to approximate return quality
|
|
# Score 90+ = historically best 10% buys, score 10- = worst 10%
|
|
# Use actual score as proxy for "quality rank"
|
|
if strong_buy_count > 0:
|
|
# Average actual quality score for strong buy signals
|
|
avg_quality_strong = float(np.mean(actual_scores[strong_buy_mask]))
|
|
# Estimate: top quality signals should be 70+
|
|
quality_good = avg_quality_strong > 60
|
|
else:
|
|
avg_quality_strong = 0.0
|
|
quality_good = False
|
|
|
|
return {
|
|
"cost_basis_improvement_pct": round(cost_basis_improvement, 2),
|
|
"avg_cost_basis_model": round(model_avg, 2),
|
|
"avg_cost_basis_dca": round(dca_avg, 2),
|
|
"strong_buy_signal_count": strong_buy_count,
|
|
"good_buy_signal_count": good_buy_count,
|
|
"total_candles_tested": total_candles,
|
|
"signal_frequency_pct": round(signal_frequency, 2),
|
|
"pct_quality_strong_buy": round(pct_profitable_strong, 3),
|
|
"avg_score_at_actual_bottoms": round(avg_score_at_bottoms, 1),
|
|
"avg_score_at_actual_tops": round(avg_score_at_tops, 1),
|
|
"model_r2_score": round(float(r2), 4),
|
|
"avg_quality_score_strong_buy": round(avg_quality_strong, 1),
|
|
"feature_importances": feature_importances,
|
|
"per_window_cost_improvement": per_window_cost_improvement,
|
|
"score_distribution": score_distribution,
|
|
"model_type": config.get("model_type", "unknown"),
|
|
}
|
|
|
|
|
|
def _empty_results(per_window):
|
|
return {
|
|
"cost_basis_improvement_pct": 0.0,
|
|
"avg_cost_basis_model": 0.0,
|
|
"avg_cost_basis_dca": 0.0,
|
|
"strong_buy_signal_count": 0,
|
|
"good_buy_signal_count": 0,
|
|
"total_candles_tested": 0,
|
|
"signal_frequency_pct": 0.0,
|
|
"pct_quality_strong_buy": 0.0,
|
|
"avg_score_at_actual_bottoms": 0.0,
|
|
"avg_score_at_actual_tops": 0.0,
|
|
"model_r2_score": 0.0,
|
|
"avg_quality_score_strong_buy": 0.0,
|
|
"feature_importances": {},
|
|
"per_window_cost_improvement": per_window,
|
|
"score_distribution": {},
|
|
"model_type": "unknown",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="BTC Accumulation Signal Optimizer")
|
|
parser.add_argument("--config", required=True, help="Path to config JSON")
|
|
parser.add_argument("--data", required=True, help="Path to OHLCV CSV")
|
|
parser.add_argument("--output", required=True, help="Path to output results JSON")
|
|
args = parser.parse_args()
|
|
|
|
with open(args.config) as f:
|
|
config = json.load(f)
|
|
|
|
print(f"Loading data from {args.data}...")
|
|
df = pd.read_csv(args.data, parse_dates=["timestamp"])
|
|
print(f" {len(df)} rows, {df['timestamp'].iloc[0]} -> {df['timestamp'].iloc[-1]}")
|
|
|
|
print("Computing accumulation features...")
|
|
df = compute_features(df, config)
|
|
|
|
print("Creating accumulation score targets...")
|
|
df["target"] = create_accumulation_target(df, config)
|
|
|
|
# Drop NaN rows
|
|
feature_cols = [c for c in df.columns
|
|
if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]]
|
|
df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True)
|
|
print(f" {len(df)} rows after cleanup, {len(feature_cols)} features")
|
|
|
|
target_stats = df["target"].describe()
|
|
print(f" Target stats: mean={target_stats['mean']:.1f}, std={target_stats['std']:.1f}, "
|
|
f"min={target_stats['min']:.1f}, max={target_stats['max']:.1f}")
|
|
|
|
model_type = config.get("model_type", "xgboost")
|
|
rolling = config.get("training", {}).get("rolling_window", True)
|
|
print(f"\nModel: {model_type}, Rolling: {rolling}")
|
|
print("Running accumulation signal optimization...")
|
|
results = walk_forward_train_test(df, feature_cols, config)
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
print(f"\nResults saved to {args.output}")
|
|
print(f" Cost Basis Improvement: {results['cost_basis_improvement_pct']:.1f}%")
|
|
print(f" Strong Buy Signals: {results['strong_buy_signal_count']}")
|
|
print(f" Signal Frequency: {results['signal_frequency_pct']:.1f}%")
|
|
print(f" Model R2: {results['model_r2_score']:.4f}")
|
|
print(f" Score at Bottoms: {results['avg_score_at_actual_bottoms']:.1f}")
|
|
print(f" Score at Tops: {results['avg_score_at_actual_tops']:.1f}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|