#!/usr/bin/env python3 """ BTC Accumulation Signal Optimizer -- Train & Backtest Engine Self-contained script that runs on the Windows PC with GPU. Predicts the best times to BUY BTC for long-term holding by scoring each candle with an Accumulation Score (0-100). Usage: python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json """ import argparse import json import sys import warnings import numpy as np import pandas as pd from datetime import datetime import ta from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator from ta.volatility import BollingerBands, AverageTrueRange from ta.volume import OnBalanceVolumeIndicator from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA warnings.filterwarnings("ignore") # --------------------------------------------------------------------------- # Feature Engineering # --------------------------------------------------------------------------- def compute_features(df, config): """Compute accumulation-focused features from OHLCV data.""" feat = config.get("features", {}) c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"] # --- Price Position --- if feat.get("use_price_position", True): # Distance from ATH (all-time high in dataset) as % rolling_ath = c.expanding().max() df["dist_from_ath_pct"] = (c - rolling_ath) / rolling_ath * 100 # Distance from 52-week high/low (using ~2190 4h candles = 365 days) period_52w = min(2190, len(df) - 1) if period_52w > 50: rolling_52w_high = h.rolling(period_52w, min_periods=50).max() rolling_52w_low = l.rolling(period_52w, min_periods=50).min() df["dist_from_52w_high_pct"] = (c - rolling_52w_high) / rolling_52w_high * 100 df["dist_from_52w_low_pct"] = (c - rolling_52w_low) / rolling_52w_low * 100 # Price vs SMA(50) and SMA(200) sma50 = SMAIndicator(c, window=50).sma_indicator() sma200 = SMAIndicator(c, window=200).sma_indicator() df["SMA_50"] = sma50 df["SMA_200"] = sma200 df["price_vs_sma50_pct"] = (c - sma50) / sma50 * 100 df["price_vs_sma200_pct"] = (c - sma200) / sma200 * 100 df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100 # golden/death cross # Price percentile over last 365 candles (~2190 for 4h, ~8760 for 1h) period_365 = min(2190, len(df) - 1) if period_365 > 50: df["price_percentile_365"] = c.rolling(period_365, min_periods=50).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False ) # Additional SMAs for p in [10, 20]: sma = SMAIndicator(c, window=p).sma_indicator() df[f"price_vs_sma{p}_pct"] = (c - sma) / sma * 100 # --- Momentum / Oversold --- if feat.get("use_momentum", True): df["RSI_14"] = RSIIndicator(c, window=14).rsi() df["RSI_7"] = RSIIndicator(c, window=7).rsi() macd = MACD(c, window_slow=26, window_fast=12, window_sign=9) df["MACD_line"] = macd.macd() df["MACD_signal"] = macd.macd_signal() df["MACD_hist"] = macd.macd_diff() stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3) df["stoch_k"] = stoch.stoch() df["stoch_d"] = stoch.stoch_signal() df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r() df["ROC_30"] = ROCIndicator(c, window=30).roc() df["ROC_90"] = ROCIndicator(c, window=90).roc() # --- Volatility / Fear --- if feat.get("use_volatility", True): bb = BollingerBands(c, window=20, window_dev=2) df["BB_width"] = (bb.bollinger_hband() - bb.bollinger_lband()) / c df["BB_pctb"] = bb.bollinger_pband() df["price_vs_lower_bb"] = (c - bb.bollinger_lband()) / c atr = AverageTrueRange(h, l, c, window=14) df["ATR_14"] = atr.average_true_range() df["ATR_pct"] = df["ATR_14"] / c * 100 # Consecutive red candles is_red = (c < o).astype(int) df["consecutive_red"] = is_red.groupby( (is_red != is_red.shift()).cumsum() ).cumsum() * is_red # Max drawdown over last 30 candles if len(df) > 30: rolling_max_30 = c.rolling(30, min_periods=1).max() df["drawdown_30"] = (c - rolling_max_30) / rolling_max_30 * 100 # Historical volatility df["hist_volatility_20"] = c.pct_change().rolling(20).std() * np.sqrt(252) # --- Volume --- if feat.get("use_volume", True): df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume() vol_sma20 = v.rolling(20).mean() df["volume_ratio"] = v / vol_sma20 # Volume on red vs green candles ratio (rolling 20) is_green = (c >= o).astype(float) is_red_f = (c < o).astype(float) green_vol = (v * is_green).rolling(20).sum() red_vol = (v * is_red_f).rolling(20).sum() df["red_green_vol_ratio"] = red_vol / (green_vol + 1e-10) # OBV trend (slope over 20 candles) df["OBV_slope"] = df["OBV"].diff(20) / (df["OBV"].rolling(20).mean().abs() + 1e-10) # --- Cycle --- if feat.get("use_cycle", True): # MA(50) vs MA(200) position (bull/bear regime) -- already computed if price_position if "sma50_vs_sma200" not in df.columns: sma50 = SMAIndicator(c, window=50).sma_indicator() sma200 = SMAIndicator(c, window=200).sma_indicator() df["sma50_vs_sma200"] = (sma50 - sma200) / sma200 * 100 # Days since last major drawdown (>20% from peak) rolling_peak = c.expanding().max() drawdown_from_peak = (c - rolling_peak) / rolling_peak major_dd = (drawdown_from_peak < -0.20).astype(int) # Count candles since last major drawdown dd_groups = major_dd.groupby((major_dd != major_dd.shift()).cumsum()) df["candles_since_major_dd"] = dd_groups.cumcount() # Reset to 0 at drawdown points, count up otherwise df.loc[major_dd == 1, "candles_since_major_dd"] = 0 return df # --------------------------------------------------------------------------- # Target: Accumulation Score # --------------------------------------------------------------------------- def create_accumulation_target(df, config): """Create accumulation score target based on forward returns. For each candle, compute actual forward returns at multiple horizons, rank them, and create a weighted accumulation score (0-100). Times when buying led to the best long-term returns get highest scores. """ tgt = config.get("target", {}) timeframe = config.get("timeframe", "4h") if timeframe == "1h": forward_periods = tgt.get("forward_periods_1h", [168, 720, 2160]) else: forward_periods = tgt.get("forward_periods_4h", [42, 180, 540]) weights = tgt.get("weights", [0.2, 0.3, 0.5]) # Ensure weights sum to 1 w_sum = sum(weights) weights = [w / w_sum for w in weights] close = df["close"].values n = len(close) # Compute forward returns for each horizon forward_returns = [] for period in forward_periods: fwd = np.full(n, np.nan) for i in range(n - period): fwd[i] = (close[i + period] - close[i]) / close[i] * 100 forward_returns.append(fwd) # Rank each forward return (percentile rank, 0-1) # Higher rank = better buy point (higher future return) ranked = [] for fwd in forward_returns: valid_mask = ~np.isnan(fwd) ranks = np.full(n, np.nan) valid_vals = fwd[valid_mask] if len(valid_vals) > 0: from scipy.stats import rankdata r = rankdata(valid_vals, method="average") / len(valid_vals) ranks[valid_mask] = r ranked.append(ranks) # Weighted combination of ranks -> accumulation score (0-100) score = np.zeros(n) valid = np.ones(n, dtype=bool) for r, w in zip(ranked, weights): nan_mask = np.isnan(r) valid &= ~nan_mask r_filled = np.where(nan_mask, 0, r) score += w * r_filled # Scale to 0-100 score = score * 100 score[~valid] = np.nan return pd.Series(score, index=df.index, name="target") # --------------------------------------------------------------------------- # LSTM Regressor (PyTorch) # --------------------------------------------------------------------------- def get_device(): """Detect best available device for PyTorch.""" import torch if torch.cuda.is_available(): return torch.device("cuda") return torch.device("cpu") class LSTMRegressor: """PyTorch LSTM for regression of accumulation scores.""" def __init__(self, input_size, hp): import torch import torch.nn as nn self.hp = hp self.device = get_device() self.sequence_length = hp.get("lstm_sequence_length", 30) hidden_size = hp.get("lstm_hidden_size", 128) num_layers = hp.get("lstm_num_layers", 2) dropout = hp.get("lstm_dropout", 0.3) class _LSTMNet(nn.Module): def __init__(self_net): super().__init__() self_net.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0.0, ) self_net.dropout = nn.Dropout(dropout) self_net.fc = nn.Linear(hidden_size, 1) def forward(self_net, x): lstm_out, _ = self_net.lstm(x) last_hidden = lstm_out[:, -1, :] out = self_net.dropout(last_hidden) out = self_net.fc(out) return out.squeeze(-1) self.model = _LSTMNet().to(self.device) self.feature_importances_ = None def _make_sequences(self, X, y=None): """Convert flat feature arrays into overlapping sequences.""" import torch seq_len = self.sequence_length sequences = [] targets = [] for i in range(seq_len, len(X)): sequences.append(X[i - seq_len:i]) if y is not None: targets.append(y[i]) X_seq = torch.FloatTensor(np.array(sequences)).to(self.device) if y is not None: y_seq = torch.FloatTensor(np.array(targets)).to(self.device) return X_seq, y_seq return X_seq def fit(self, X_train, y_train, X_val=None, y_val=None): import torch import torch.nn as nn lr = self.hp.get("learning_rate", 0.001) epochs = self.hp.get("lstm_epochs", 100) batch_size = self.hp.get("lstm_batch_size", 64) patience = self.hp.get("lstm_patience", 10) optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) criterion = nn.MSELoss() X_seq, y_seq = self._make_sequences(X_train, y_train) if X_val is not None and y_val is not None: X_val_seq, y_val_seq = self._make_sequences(X_val, y_val) has_val = len(X_val_seq) > 0 else: has_val = False best_val_loss = float("inf") patience_counter = 0 best_state = None self.model.train() n_samples = len(X_seq) for epoch in range(epochs): perm = torch.randperm(n_samples) X_seq_shuffled = X_seq[perm] y_seq_shuffled = y_seq[perm] epoch_loss = 0.0 n_batches = 0 for start in range(0, n_samples, batch_size): end = min(start + batch_size, n_samples) X_batch = X_seq_shuffled[start:end] y_batch = y_seq_shuffled[start:end] optimizer.zero_grad() preds = self.model(X_batch) loss = criterion(preds, y_batch) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) optimizer.step() epoch_loss += loss.item() n_batches += 1 avg_loss = epoch_loss / max(n_batches, 1) if has_val: self.model.eval() with torch.no_grad(): val_preds = self.model(X_val_seq) val_loss = criterion(val_preds, y_val_seq).item() self.model.train() if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 best_state = {k: v.clone() for k, v in self.model.state_dict().items()} else: patience_counter += 1 if patience_counter >= patience: print(f" LSTM early stop at epoch {epoch+1}, val_loss={val_loss:.4f}") break if (epoch + 1) % 20 == 0: print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}, val_loss={val_loss:.4f}") else: if (epoch + 1) % 20 == 0: print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}") if best_state is not None: self.model.load_state_dict(best_state) def predict(self, X): import torch self.model.eval() X_seq = self._make_sequences(X) with torch.no_grad(): preds = self.model(X_seq).cpu().numpy() # Clamp to 0-100 return np.clip(preds, 0, 100) # --------------------------------------------------------------------------- # Model Building # --------------------------------------------------------------------------- def build_model(config, input_size=0): """Build regression model based on config.""" model_type = config.get("model_type", "xgboost") hp = config.get("hyperparameters", {}) if model_type == "lstm": return LSTMRegressor(input_size, hp) if model_type == "xgboost": import xgboost as xgb try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.01), "max_depth": hp.get("max_depth", 5), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_weight": hp.get("min_child_weight", 10), "gamma": hp.get("gamma", 0.3), "reg_alpha": hp.get("reg_alpha", 0.5), "reg_lambda": hp.get("reg_lambda", 3.0), "objective": "reg:squarederror", "eval_metric": "rmse", "random_state": 42, "device": "cuda" if gpu_available else "cpu", "verbosity": 0, } return xgb.XGBRegressor(**params) elif model_type == "lightgbm": import lightgbm as lgb params = { "learning_rate": hp.get("learning_rate", 0.01), "max_depth": hp.get("max_depth", 5), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_samples": hp.get("min_child_weight", 10), "reg_alpha": hp.get("reg_alpha", 0.5), "reg_lambda": hp.get("reg_lambda", 3.0), "objective": "regression", "metric": "rmse", "random_state": 42, "verbose": -1, } try: params["device"] = "gpu" return lgb.LGBMRegressor(**params) except Exception: params["device"] = "cpu" return lgb.LGBMRegressor(**params) elif model_type == "catboost": from catboost import CatBoostRegressor try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.01), "depth": hp.get("max_depth", 5), "iterations": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "l2_leaf_reg": hp.get("reg_lambda", 3.0), "loss_function": "RMSE", "random_seed": 42, "verbose": 0, "task_type": "GPU" if gpu_available else "CPU", } return CatBoostRegressor(**params) elif model_type == "ensemble": raise ValueError("Use 'hybrid' instead of 'ensemble' for accumulation mode") else: raise ValueError(f"Unknown model_type: {model_type}") # --------------------------------------------------------------------------- # Scaling & PCA # --------------------------------------------------------------------------- def apply_scaling_pca(X_train, X_val, X_test, config): """Apply StandardScaler and optional PCA.""" feat_cfg = config.get("features", {}) scaler = None pca = None if feat_cfg.get("use_scaler", True): scaler = StandardScaler() X_train = scaler.fit_transform(X_train) if X_val is not None: X_val = scaler.transform(X_val) X_test = scaler.transform(X_test) if feat_cfg.get("use_pca", False): variance = feat_cfg.get("pca_variance", 0.95) pca = PCA(n_components=variance, svd_solver="full") X_train = pca.fit_transform(X_train) if X_val is not None: X_val = pca.transform(X_val) X_test = pca.transform(X_test) print(f" PCA: {pca.n_components_} components (retaining {variance*100:.0f}% variance)") return X_train, X_val, X_test, scaler, pca # --------------------------------------------------------------------------- # Rolling Window Validation # --------------------------------------------------------------------------- def rolling_window_train_test(df, feature_cols, config): """Rolling window train/test for accumulation score prediction.""" training_cfg = config.get("training", {}) train_size = training_cfg.get("rolling_train_size", 2500) test_size = training_cfg.get("rolling_test_size", 300) val_pct = training_cfg.get("validation_pct", 0.15) model_type = config.get("model_type", "xgboost") n = len(df) all_predictions = [] # list of (predicted_score, actual_score, close_price) per_window_results = [] feature_importances_sum = None fi_count = 0 effective_n_features = len(feature_cols) window_count = 0 last_pca = None start = 0 while start + train_size + test_size <= n: train_end = start + train_size test_end = min(train_end + test_size, n) train_full = df.iloc[start:train_end] test_df = df.iloc[train_end:test_end] if len(test_df) < 10: start += test_size continue # Check target has variance train_target = train_full["target"].dropna() if len(train_target) < 100 or train_target.std() < 1.0: start += test_size continue # Split train into train/val val_split = int(len(train_full) * (1.0 - val_pct)) train_df = train_full.iloc[:val_split] val_df = train_full.iloc[val_split:] X_train = train_df[feature_cols].values y_train = train_df["target"].values X_val = val_df[feature_cols].values y_val = val_df["target"].values X_test = test_df[feature_cols].values y_test = test_df["target"].values # Scale and PCA X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config) last_pca = pca current_n_features = X_train.shape[1] if feature_importances_sum is None: effective_n_features = current_n_features feature_importances_sum = np.zeros(effective_n_features) window_count += 1 total_possible = (n - train_size) // test_size # Train and predict preds, fi = _train_and_predict_window( X_train, y_train, X_val, y_val, X_test, config, current_n_features, window_count - 1, total_possible ) if fi is not None and len(fi) == effective_n_features: feature_importances_sum += fi fi_count += 1 # Align predictions with test data if model_type in ("lstm", "hybrid"): seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30) offset = seq_len if model_type == "lstm" else seq_len if len(preds) < len(test_df): test_aligned = test_df.iloc[offset:offset + len(preds)] else: test_aligned = test_df.iloc[:len(preds)] else: test_aligned = test_df.iloc[:len(preds)] min_len = min(len(preds), len(test_aligned)) preds = preds[:min_len] test_aligned = test_aligned.iloc[:min_len] for pred_score, actual_score, close_price in zip( preds, test_aligned["target"].values, test_aligned["close"].values ): if not np.isnan(actual_score): all_predictions.append({ "predicted": float(pred_score), "actual": float(actual_score), "close": float(close_price), }) # Per-window cost improvement window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values) if not np.isnan(p[1])] if window_preds: wp_pred = [p[0] for p in window_preds] wp_close = [p[2] for p in window_preds] dca_avg = np.mean(wp_close) # Model buys: only when predicted score > strong_buy_threshold threshold = config.get("strategy", {}).get("strong_buy_threshold", 80) buy_prices = [close for pred, _, close in window_preds if pred >= threshold] if len(buy_prices) >= 3: model_avg = np.mean(buy_prices) improvement = (dca_avg - model_avg) / dca_avg * 100 else: improvement = 0.0 per_window_results.append(round(improvement, 1)) print(f" Window {window_count}: {len(buy_prices)} signals, cost improvement={improvement:.1f}%") else: per_window_results.append(0.0) print(f" Window {window_count}: no valid predictions") start += test_size # Build feature names if last_pca is not None and config.get("features", {}).get("use_pca", False): effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)] else: effective_feature_names = feature_cols if feature_importances_sum is None: feature_importances_sum = np.zeros(len(effective_feature_names)) return compile_results(all_predictions, per_window_results, feature_importances_sum, fi_count, effective_feature_names, config) def walk_forward_train_test(df, feature_cols, config): """Walk-forward or rolling window validation.""" training_cfg = config.get("training", {}) if training_cfg.get("rolling_window", True): return rolling_window_train_test(df, feature_cols, config) # Static walk-forward: split into N windows n_windows = training_cfg.get("walk_forward_windows", 5) train_pct = training_cfg.get("train_pct", 0.7) val_pct = training_cfg.get("validation_pct", 0.15) n = len(df) window_size = n // n_windows all_predictions = [] per_window_results = [] feature_importances_sum = None fi_count = 0 effective_n_features = len(feature_cols) last_pca = None for w in range(n_windows): w_start = w * window_size w_end = min((w + 1) * window_size + int(window_size * 0.3), n) if w_end > n: w_end = n window_data = df.iloc[w_start:w_end].copy() wn = len(window_data) train_end = int(wn * train_pct) val_end = int(wn * (train_pct + val_pct)) train_df = window_data.iloc[:train_end] val_df = window_data.iloc[train_end:val_end] test_df = window_data.iloc[val_end:] if len(test_df) < 10: continue train_target = train_df["target"].dropna() if len(train_target) < 100 or train_target.std() < 1.0: continue X_train = train_df[feature_cols].values y_train = train_df["target"].values X_val = val_df[feature_cols].values y_val = val_df["target"].values X_test = test_df[feature_cols].values X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config) last_pca = pca current_n_features = X_train.shape[1] if feature_importances_sum is None: effective_n_features = current_n_features feature_importances_sum = np.zeros(effective_n_features) preds, fi = _train_and_predict_window( X_train, y_train, X_val, y_val, X_test, config, current_n_features, w, n_windows ) if fi is not None and len(fi) == effective_n_features: feature_importances_sum += fi fi_count += 1 model_type = config.get("model_type", "xgboost") if model_type in ("lstm", "hybrid"): seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 30) test_aligned = test_df.iloc[seq_len:seq_len + len(preds)] else: test_aligned = test_df.iloc[:len(preds)] min_len = min(len(preds), len(test_aligned)) preds = preds[:min_len] test_aligned = test_aligned.iloc[:min_len] for pred_score, actual_score, close_price in zip( preds, test_aligned["target"].values, test_aligned["close"].values ): if not np.isnan(actual_score): all_predictions.append({ "predicted": float(pred_score), "actual": float(actual_score), "close": float(close_price), }) # Per-window metrics window_preds = [p for p in zip(preds, test_aligned["target"].values, test_aligned["close"].values) if not np.isnan(p[1])] if window_preds: threshold = config.get("strategy", {}).get("strong_buy_threshold", 80) wp_close = [p[2] for p in window_preds] buy_prices = [close for pred, _, close in window_preds if pred >= threshold] dca_avg = np.mean(wp_close) if len(buy_prices) >= 3: improvement = (dca_avg - np.mean(buy_prices)) / dca_avg * 100 else: improvement = 0.0 per_window_results.append(round(improvement, 1)) print(f" Window {w+1}/{n_windows}: {len(buy_prices)} signals, improvement={improvement:.1f}%") else: per_window_results.append(0.0) if last_pca is not None and config.get("features", {}).get("use_pca", False): effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)] else: effective_feature_names = feature_cols if feature_importances_sum is None: feature_importances_sum = np.zeros(len(effective_feature_names)) return compile_results(all_predictions, per_window_results, feature_importances_sum, fi_count, effective_feature_names, config) # --------------------------------------------------------------------------- # Train & Predict Helpers # --------------------------------------------------------------------------- def _train_and_predict_window(X_train, y_train, X_val, y_val, X_test, config, n_features, w_idx, n_windows): """Train model and return predictions + feature importances.""" model_type = config.get("model_type", "xgboost") if model_type == "hybrid": return _hybrid_train_and_predict( X_train, y_train, X_val, y_val, X_test, config, n_features ) if model_type == "lstm": model = build_model(config, input_size=n_features) else: model = build_model(config) try: if model_type == "lstm": model.fit(X_train, y_train, X_val, y_val) else: model.fit(X_train, y_train) except Exception as e: print(f" Window {w_idx+1}: training failed -- {e}", file=sys.stderr) return np.array([]), None try: if model_type == "lstm": preds = model.predict(X_test) else: preds = model.predict(X_test) preds = np.clip(preds, 0, 100) except Exception as e: print(f" Window {w_idx+1}: prediction failed -- {e}", file=sys.stderr) return np.array([]), None fi = _extract_feature_importances(model, n_features) return preds, fi def _hybrid_train_and_predict(X_train, y_train, X_val, y_val, X_test, config, n_features): """Hybrid: average of LSTM + XGBoost regression predictions.""" hp = config.get("hyperparameters", {}) seq_len = hp.get("lstm_sequence_length", 30) # Train LSTM lstm_config = {**config, "model_type": "lstm"} lstm_model = build_model(lstm_config, input_size=n_features) try: lstm_model.fit(X_train, y_train, X_val, y_val) lstm_preds = lstm_model.predict(X_test) except Exception as e: print(f" Hybrid LSTM failed: {e}", file=sys.stderr) lstm_preds = np.full(max(0, len(X_test) - seq_len), 50.0) # Train XGBoost xgb_config = {**config, "model_type": "xgboost"} xgb_model = build_model(xgb_config) try: xgb_model.fit(X_train, y_train) xgb_preds_full = xgb_model.predict(X_test) xgb_preds_full = np.clip(xgb_preds_full, 0, 100) except Exception as e: print(f" Hybrid XGBoost failed: {e}", file=sys.stderr) xgb_preds_full = np.full(len(X_test), 50.0) # Align: LSTM output is shorter by seq_len xgb_preds = xgb_preds_full[seq_len:] min_len = min(len(lstm_preds), len(xgb_preds)) lstm_preds = lstm_preds[:min_len] xgb_preds = xgb_preds[:min_len] # Average (equal weight) combined = 0.5 * lstm_preds + 0.5 * xgb_preds combined = np.clip(combined, 0, 100) fi = _extract_feature_importances(xgb_model, n_features) return combined, fi def _extract_feature_importances(model, n_features): """Extract normalized feature importances from a model.""" try: if hasattr(model, "feature_importances_"): fi = model.feature_importances_ elif hasattr(model, "get_booster"): fi_dict = model.get_booster().get_score(importance_type="gain") fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(n_features)]) else: return None return fi / (fi.sum() + 1e-10) except Exception: return None # --------------------------------------------------------------------------- # Results Compilation # --------------------------------------------------------------------------- def compile_results(predictions, per_window_cost_improvement, fi_sum, fi_count, feature_cols, config): """Compile accumulation signal results into output JSON.""" strategy = config.get("strategy", {}) strong_threshold = strategy.get("strong_buy_threshold", 80) good_threshold = strategy.get("good_buy_threshold", 70) poor_threshold = strategy.get("poor_threshold", 30) if not predictions: return _empty_results(per_window_cost_improvement) pred_scores = np.array([p["predicted"] for p in predictions]) actual_scores = np.array([p["actual"] for p in predictions]) close_prices = np.array([p["close"] for p in predictions]) total_candles = len(predictions) # --- Signal Quality: STRONG BUY (score > strong_threshold) --- strong_buy_mask = pred_scores >= strong_threshold strong_buy_count = int(np.sum(strong_buy_mask)) if strong_buy_count > 0: strong_buy_actual = actual_scores[strong_buy_mask] # Actual scores correspond to forward return quality avg_actual_strong = float(np.mean(strong_buy_actual)) else: avg_actual_strong = 0.0 # We need forward return info. Since actual_score is a rank-based measure (0-100), # and we want to report real forward returns, we approximate: # actual_score > 80 means the buy was in the top 20% of quality. # For actual forward return stats, we use actual score as a proxy. # Profitable signals: those where actual score is also above median (50) if strong_buy_count > 0: pct_profitable_strong = float(np.mean(actual_scores[strong_buy_mask] > 50)) else: pct_profitable_strong = 0.0 # --- Cost Basis Comparison vs DCA --- dca_avg = float(np.mean(close_prices)) # Model strategy: buy when predicted score >= good_threshold good_buy_mask = pred_scores >= good_threshold good_buy_prices = close_prices[good_buy_mask] good_buy_count = len(good_buy_prices) if good_buy_count >= 3: model_avg = float(np.mean(good_buy_prices)) cost_basis_improvement = (dca_avg - model_avg) / dca_avg * 100 else: model_avg = dca_avg cost_basis_improvement = 0.0 # --- Signal Frequency --- signal_frequency = strong_buy_count / total_candles * 100 if total_candles > 0 else 0 # --- Score at actual extremes --- # "Actual bottoms" = candles with actual score > 85 (top 15% buy opportunities) actual_bottom_mask = actual_scores > 85 if np.any(actual_bottom_mask): avg_score_at_bottoms = float(np.mean(pred_scores[actual_bottom_mask])) else: avg_score_at_bottoms = 0.0 # "Actual tops" = candles with actual score < 15 (worst 15% buy times) actual_top_mask = actual_scores < 15 if np.any(actual_top_mask): avg_score_at_tops = float(np.mean(pred_scores[actual_top_mask])) else: avg_score_at_tops = 50.0 # --- Model R2 Score --- ss_res = np.sum((actual_scores - pred_scores) ** 2) ss_tot = np.sum((actual_scores - np.mean(actual_scores)) ** 2) r2 = 1 - ss_res / (ss_tot + 1e-10) # --- Feature Importances --- fi_avg = fi_sum / max(fi_count, 1) fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1]) feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]} # --- Score Distribution --- bins = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)] score_distribution = {} for lo, hi in bins: key = f"{lo}-{hi}" count = int(np.sum((pred_scores >= lo) & (pred_scores < (hi if hi < 100 else 101)))) score_distribution[key] = count # --- Forward return approximation from actual scores --- # Map actual score to approximate return quality # Score 90+ = historically best 10% buys, score 10- = worst 10% # Use actual score as proxy for "quality rank" if strong_buy_count > 0: # Average actual quality score for strong buy signals avg_quality_strong = float(np.mean(actual_scores[strong_buy_mask])) # Estimate: top quality signals should be 70+ quality_good = avg_quality_strong > 60 else: avg_quality_strong = 0.0 quality_good = False return { "cost_basis_improvement_pct": round(cost_basis_improvement, 2), "avg_cost_basis_model": round(model_avg, 2), "avg_cost_basis_dca": round(dca_avg, 2), "strong_buy_signal_count": strong_buy_count, "good_buy_signal_count": good_buy_count, "total_candles_tested": total_candles, "signal_frequency_pct": round(signal_frequency, 2), "pct_quality_strong_buy": round(pct_profitable_strong, 3), "avg_score_at_actual_bottoms": round(avg_score_at_bottoms, 1), "avg_score_at_actual_tops": round(avg_score_at_tops, 1), "model_r2_score": round(float(r2), 4), "avg_quality_score_strong_buy": round(avg_quality_strong, 1), "feature_importances": feature_importances, "per_window_cost_improvement": per_window_cost_improvement, "score_distribution": score_distribution, "model_type": config.get("model_type", "unknown"), } def _empty_results(per_window): return { "cost_basis_improvement_pct": 0.0, "avg_cost_basis_model": 0.0, "avg_cost_basis_dca": 0.0, "strong_buy_signal_count": 0, "good_buy_signal_count": 0, "total_candles_tested": 0, "signal_frequency_pct": 0.0, "pct_quality_strong_buy": 0.0, "avg_score_at_actual_bottoms": 0.0, "avg_score_at_actual_tops": 0.0, "model_r2_score": 0.0, "avg_quality_score_strong_buy": 0.0, "feature_importances": {}, "per_window_cost_improvement": per_window, "score_distribution": {}, "model_type": "unknown", } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="BTC Accumulation Signal Optimizer") parser.add_argument("--config", required=True, help="Path to config JSON") parser.add_argument("--data", required=True, help="Path to OHLCV CSV") parser.add_argument("--output", required=True, help="Path to output results JSON") args = parser.parse_args() with open(args.config) as f: config = json.load(f) print(f"Loading data from {args.data}...") df = pd.read_csv(args.data, parse_dates=["timestamp"]) print(f" {len(df)} rows, {df['timestamp'].iloc[0]} -> {df['timestamp'].iloc[-1]}") print("Computing accumulation features...") df = compute_features(df, config) print("Creating accumulation score targets...") df["target"] = create_accumulation_target(df, config) # Drop NaN rows feature_cols = [c for c in df.columns if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]] df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True) print(f" {len(df)} rows after cleanup, {len(feature_cols)} features") target_stats = df["target"].describe() print(f" Target stats: mean={target_stats['mean']:.1f}, std={target_stats['std']:.1f}, " f"min={target_stats['min']:.1f}, max={target_stats['max']:.1f}") model_type = config.get("model_type", "xgboost") rolling = config.get("training", {}).get("rolling_window", True) print(f"\nModel: {model_type}, Rolling: {rolling}") print("Running accumulation signal optimization...") results = walk_forward_train_test(df, feature_cols, config) with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {args.output}") print(f" Cost Basis Improvement: {results['cost_basis_improvement_pct']:.1f}%") print(f" Strong Buy Signals: {results['strong_buy_signal_count']}") print(f" Signal Frequency: {results['signal_frequency_pct']:.1f}%") print(f" Model R2: {results['model_r2_score']:.4f}") print(f" Score at Bottoms: {results['avg_score_at_actual_bottoms']:.1f}") print(f" Score at Tops: {results['avg_score_at_actual_tops']:.1f}") if __name__ == "__main__": main()