#!/usr/bin/env python3 """ BTC ML Trading Strategy -- Train & Backtest Engine Self-contained script that runs on the Windows PC with GPU. Usage: python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json """ import argparse import json import sys import warnings import numpy as np import pandas as pd from datetime import datetime import ta from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator from ta.volatility import BollingerBands, AverageTrueRange, KeltnerChannel from ta.volume import OnBalanceVolumeIndicator from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA warnings.filterwarnings("ignore") # --------------------------------------------------------------------------- # Feature Engineering # --------------------------------------------------------------------------- def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame: """Compute 60+ technical features from OHLCV data.""" feat = config.get("features", {}) c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"] # --- Price SMAs & EMAs --- for p in [5, 10, 20, 50, 200]: df[f"SMA_{p}"] = SMAIndicator(c, window=p).sma_indicator() df[f"price_vs_SMA_{p}"] = c / df[f"SMA_{p}"] - 1 for p in [5, 10, 20, 50]: df[f"EMA_{p}"] = EMAIndicator(c, window=p).ema_indicator() # --- Momentum --- for p in [7, 14, 21]: df[f"RSI_{p}"] = RSIIndicator(c, window=p).rsi() macd = MACD(c, window_slow=26, window_fast=12, window_sign=9) df["MACD_line"] = macd.macd() df["MACD_signal"] = macd.macd_signal() df["MACD_hist"] = macd.macd_diff() stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3) df["stoch_k"] = stoch.stoch() df["stoch_d"] = stoch.stoch_signal() df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r() df["ROC_10"] = ROCIndicator(c, window=10).roc() df["CCI_20"] = CCIIndicator(h, l, c, window=20).cci() # --- Volatility --- bb = BollingerBands(c, window=20, window_dev=2) df["BB_upper"] = bb.bollinger_hband() df["BB_lower"] = bb.bollinger_lband() df["BB_width"] = (df["BB_upper"] - df["BB_lower"]) / c df["BB_pctb"] = bb.bollinger_pband() df["ATR_14"] = AverageTrueRange(h, l, c, window=14).average_true_range() df["ATR_pct"] = df["ATR_14"] / c kc = KeltnerChannel(h, l, c, window=20) df["keltner_upper"] = kc.keltner_channel_hband() df["keltner_lower"] = kc.keltner_channel_lband() df["hist_volatility"] = c.pct_change().rolling(20).std() * np.sqrt(252) # --- Volume --- if feat.get("use_volume_features", True): df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume() df["volume_sma_20"] = v.rolling(20).mean() df["volume_ratio"] = v / df["volume_sma_20"] df["volume_momentum"] = v.pct_change(5) # VWAP approximation (rolling) tp = (h + l + c) / 3 df["vwap_approx"] = (tp * v).rolling(20).sum() / v.rolling(20).sum() df["price_vs_vwap"] = c / df["vwap_approx"] - 1 # --- Candle Patterns --- if feat.get("use_candle_patterns", True): body = (c - o).abs() full_range = h - l df["candle_body_ratio"] = body / full_range.replace(0, np.nan) df["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range.replace(0, np.nan) df["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range.replace(0, np.nan) df["is_bullish"] = (c > o).astype(int) # Consecutive up/down df["consecutive_up"] = df["is_bullish"].groupby((df["is_bullish"] != df["is_bullish"].shift()).cumsum()).cumsum() df["consecutive_down"] = (1 - df["is_bullish"]).groupby(((1 - df["is_bullish"]) != (1 - df["is_bullish"]).shift()).cumsum()).cumsum() # --- Lag Features --- if feat.get("use_lag_features", True): lag_periods = feat.get("lag_periods", [1, 2, 3, 5]) for lag in lag_periods: df[f"return_lag_{lag}"] = c.pct_change(lag) df[f"volume_lag_{lag}"] = v.pct_change(lag) if f"RSI_14" in df.columns: df[f"RSI_14_lag_{lag}"] = df["RSI_14"].shift(lag) # --- Lookback period features --- for p in feat.get("lookback_periods", [3, 5, 10, 20]): df[f"return_{p}"] = c.pct_change(p) df[f"volatility_{p}"] = c.pct_change().rolling(p).std() df[f"high_low_range_{p}"] = (h.rolling(p).max() - l.rolling(p).min()) / c return df # --------------------------------------------------------------------------- # Target Labeling # --------------------------------------------------------------------------- def create_target(df: pd.DataFrame, config: dict) -> pd.Series: """Create binary target: will price move >= threshold% within horizon?""" tgt = config.get("target", {}) horizon = tgt.get("horizon_candles", 6) threshold = tgt.get("threshold_pct", 1.0) / 100.0 direction = tgt.get("direction", "long") future_max = df["close"].shift(-1).rolling(horizon).max().shift(-horizon + 1) future_min = df["close"].shift(-1).rolling(horizon).min().shift(-horizon + 1) if direction == "long": target = ((future_max / df["close"]) - 1 >= threshold).astype(int) elif direction == "short": target = ((df["close"] / future_min) - 1 >= threshold).astype(int) else: # both long_signal = ((future_max / df["close"]) - 1 >= threshold).astype(int) short_signal = ((df["close"] / future_min) - 1 >= threshold).astype(int) target = long_signal # Simplify: use long for now target[short_signal == 1] = 1 return target # --------------------------------------------------------------------------- # LSTM Model (PyTorch) # --------------------------------------------------------------------------- def get_device(): """Detect best available device for PyTorch.""" import torch if torch.cuda.is_available(): return torch.device("cuda") return torch.device("cpu") class LSTMClassifier: """PyTorch LSTM for binary classification of trading signals.""" def __init__(self, input_size, hp): import torch import torch.nn as nn self.hp = hp self.device = get_device() self.sequence_length = hp.get("lstm_sequence_length", 20) hidden_size = hp.get("lstm_hidden_size", 128) num_layers = hp.get("lstm_num_layers", 2) dropout = hp.get("lstm_dropout", 0.3) class _LSTMNet(nn.Module): def __init__(self_net): super().__init__() self_net.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0.0, ) self_net.dropout = nn.Dropout(dropout) self_net.fc = nn.Linear(hidden_size, 1) self_net.sigmoid = nn.Sigmoid() def forward(self_net, x): # x: (batch, seq_len, features) lstm_out, _ = self_net.lstm(x) # Take last time step last_hidden = lstm_out[:, -1, :] out = self_net.dropout(last_hidden) out = self_net.fc(out) out = self_net.sigmoid(out) return out.squeeze(-1) self.model = _LSTMNet().to(self.device) self.feature_importances_ = None def _make_sequences(self, X, y=None): """Convert flat feature arrays into overlapping sequences.""" import torch seq_len = self.sequence_length sequences = [] targets = [] for i in range(seq_len, len(X)): sequences.append(X[i - seq_len:i]) if y is not None: targets.append(y[i]) X_seq = torch.FloatTensor(np.array(sequences)).to(self.device) if y is not None: y_seq = torch.FloatTensor(np.array(targets)).to(self.device) return X_seq, y_seq return X_seq def fit(self, X_train, y_train, X_val=None, y_val=None): import torch import torch.nn as nn lr = self.hp.get("learning_rate", 0.001) epochs = self.hp.get("lstm_epochs", 100) batch_size = self.hp.get("lstm_batch_size", 64) patience = self.hp.get("lstm_patience", 10) optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) criterion = nn.BCELoss() X_seq, y_seq = self._make_sequences(X_train, y_train) if X_val is not None and y_val is not None: X_val_seq, y_val_seq = self._make_sequences(X_val, y_val) has_val = len(X_val_seq) > 0 else: has_val = False best_val_loss = float("inf") patience_counter = 0 best_state = None self.model.train() n_samples = len(X_seq) for epoch in range(epochs): # Shuffle perm = torch.randperm(n_samples) X_seq = X_seq[perm] y_seq = y_seq[perm] epoch_loss = 0.0 n_batches = 0 for start in range(0, n_samples, batch_size): end = min(start + batch_size, n_samples) X_batch = X_seq[start:end] y_batch = y_seq[start:end] optimizer.zero_grad() preds = self.model(X_batch) loss = criterion(preds, y_batch) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) optimizer.step() epoch_loss += loss.item() n_batches += 1 avg_loss = epoch_loss / max(n_batches, 1) # Validation if has_val: self.model.eval() with torch.no_grad(): val_preds = self.model(X_val_seq) val_loss = criterion(val_preds, y_val_seq).item() self.model.train() if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 best_state = {k: v.clone() for k, v in self.model.state_dict().items()} else: patience_counter += 1 if patience_counter >= patience: print(f" LSTM early stop at epoch {epoch+1}, val_loss={val_loss:.4f}") break if (epoch + 1) % 20 == 0: print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}, val_loss={val_loss:.4f}") else: if (epoch + 1) % 20 == 0: print(f" Epoch {epoch+1}/{epochs}: loss={avg_loss:.4f}") if best_state is not None: self.model.load_state_dict(best_state) def predict_proba(self, X): import torch self.model.eval() X_seq = self._make_sequences(X) with torch.no_grad(): proba_pos = self.model(X_seq).cpu().numpy() proba_neg = 1.0 - proba_pos return np.column_stack([proba_neg, proba_pos]) def predict(self, X): proba = self.predict_proba(X) return (proba[:, 1] >= 0.5).astype(int) # --------------------------------------------------------------------------- # Model Building # --------------------------------------------------------------------------- def build_model(config: dict, input_size: int = 0): """Build the ML model based on config.""" model_type = config.get("model_type", "xgboost") hp = config.get("hyperparameters", {}) if model_type == "lstm": return LSTMClassifier(input_size, hp) if model_type == "xgboost": import xgboost as xgb # Detect GPU try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.05), "max_depth": hp.get("max_depth", 6), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_weight": hp.get("min_child_weight", 5), "gamma": hp.get("gamma", 0.1), "reg_alpha": hp.get("reg_alpha", 0.1), "reg_lambda": hp.get("reg_lambda", 1.0), "eval_metric": "logloss", "random_state": 42, "device": "cuda" if gpu_available else "cpu", "verbosity": 0, } return xgb.XGBClassifier(**params) elif model_type == "lightgbm": import lightgbm as lgb params = { "learning_rate": hp.get("learning_rate", 0.05), "max_depth": hp.get("max_depth", 6), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_samples": hp.get("min_child_weight", 5), "reg_alpha": hp.get("reg_alpha", 0.1), "reg_lambda": hp.get("reg_lambda", 1.0), "random_state": 42, "verbose": -1, } try: params["device"] = "gpu" model = lgb.LGBMClassifier(**params) return model except Exception: params["device"] = "cpu" return lgb.LGBMClassifier(**params) elif model_type == "catboost": from catboost import CatBoostClassifier try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.05), "depth": hp.get("max_depth", 6), "iterations": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "l2_leaf_reg": hp.get("reg_lambda", 1.0), "random_seed": 42, "verbose": 0, "task_type": "GPU" if gpu_available else "CPU", } return CatBoostClassifier(**params) elif model_type == "ensemble": from sklearn.ensemble import VotingClassifier models = [] for sub_type in ["xgboost", "lightgbm", "catboost"]: sub_config = {**config, "model_type": sub_type} m = build_model(sub_config) models.append((sub_type, m)) return VotingClassifier(estimators=models, voting="soft") else: raise ValueError(f"Unknown model_type: {model_type}") # --------------------------------------------------------------------------- # Scaling & PCA # --------------------------------------------------------------------------- def apply_scaling_pca(X_train, X_val, X_test, config): """Apply StandardScaler and optional PCA. Returns transformed arrays and fitted objects.""" feat_cfg = config.get("features", {}) scaler = None pca = None if feat_cfg.get("use_scaler", False): scaler = StandardScaler() X_train = scaler.fit_transform(X_train) if X_val is not None: X_val = scaler.transform(X_val) X_test = scaler.transform(X_test) if feat_cfg.get("use_pca", False): variance = feat_cfg.get("pca_variance", 0.95) pca = PCA(n_components=variance, svd_solver="full") X_train = pca.fit_transform(X_train) if X_val is not None: X_val = pca.transform(X_val) X_test = pca.transform(X_test) print(f" PCA: {pca.n_components_} components (retaining {variance*100:.0f}% variance)") return X_train, X_val, X_test, scaler, pca # --------------------------------------------------------------------------- # Walk-Forward / Rolling Window Validation + Backtesting # --------------------------------------------------------------------------- def walk_forward_train_test(df: pd.DataFrame, feature_cols: list, config: dict) -> dict: """Walk-forward or rolling window validation with backtesting.""" training_cfg = config.get("training", {}) strategy = config.get("strategy", {}) model_type = config.get("model_type", "xgboost") use_rolling = training_cfg.get("rolling_window", False) if use_rolling: return rolling_window_train_test(df, feature_cols, config) # Standard walk-forward n_windows = training_cfg.get("walk_forward_windows", 5) train_pct = training_cfg.get("train_pct", 0.7) val_pct = training_cfg.get("validation_pct", 0.15) n = len(df) window_size = n // n_windows all_trades = [] per_window_sharpe = [] feature_importances_sum = None fi_count = 0 effective_n_features = len(feature_cols) for w in range(n_windows): start = w * window_size end = min((w + 1) * window_size + int(window_size * 0.3), n) # overlap for test if end > n: end = n window_data = df.iloc[start:end].copy() wn = len(window_data) train_end = int(wn * train_pct) val_end = int(wn * (train_pct + val_pct)) train_df = window_data.iloc[:train_end] val_df = window_data.iloc[train_end:val_end] test_df = window_data.iloc[val_end:] if len(test_df) < 10 or train_df["target"].nunique() < 2: continue X_train = train_df[feature_cols].values y_train = train_df["target"].values X_val = val_df[feature_cols].values y_val = val_df["target"].values X_test = test_df[feature_cols].values # Scale and PCA X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config) current_n_features = X_train.shape[1] if feature_importances_sum is None: effective_n_features = current_n_features feature_importances_sum = np.zeros(effective_n_features) # Build and train trades, fi = _train_and_backtest_window( X_train, y_train, X_val, y_val, X_test, test_df, config, strategy, current_n_features, w, n_windows ) if fi is not None and len(fi) == effective_n_features: feature_importances_sum += fi fi_count += 1 all_trades.extend(trades) # Window sharpe if trades: returns = [t["return_pct"] for t in trades] mean_r = np.mean(returns) std_r = np.std(returns) if len(returns) > 1 else 1.0 sharpe = (mean_r / std_r) * np.sqrt(252 / max(1, len(trades))) if std_r > 0 else 0 per_window_sharpe.append(round(sharpe, 3)) else: per_window_sharpe.append(0.0) print(f" Window {w+1}/{n_windows}: {len(trades)} trades, sharpe={per_window_sharpe[-1]}") # Build feature names for output if pca is not None and config.get("features", {}).get("use_pca", False): effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)] else: effective_feature_names = feature_cols if feature_importances_sum is None: feature_importances_sum = np.zeros(len(effective_feature_names)) return compile_results(all_trades, per_window_sharpe, feature_importances_sum, fi_count, effective_feature_names, df) def rolling_window_train_test(df: pd.DataFrame, feature_cols: list, config: dict) -> dict: """Rolling window train/test with sliding forward.""" training_cfg = config.get("training", {}) strategy = config.get("strategy", {}) train_size = training_cfg.get("rolling_train_size", 2000) test_size = training_cfg.get("rolling_test_size", 200) val_pct = training_cfg.get("validation_pct", 0.15) n = len(df) all_trades = [] per_window_sharpe = [] feature_importances_sum = None fi_count = 0 effective_n_features = len(feature_cols) window_count = 0 start = 0 while start + train_size + test_size <= n: train_end = start + train_size test_end = min(train_end + test_size, n) train_full = df.iloc[start:train_end] test_df = df.iloc[train_end:test_end] if len(test_df) < 10 or train_full["target"].nunique() < 2: start += test_size continue # Split train into train/val val_split = int(len(train_full) * (1.0 - val_pct)) train_df = train_full.iloc[:val_split] val_df = train_full.iloc[val_split:] X_train = train_df[feature_cols].values y_train = train_df["target"].values X_val = val_df[feature_cols].values y_val = val_df["target"].values X_test = test_df[feature_cols].values # Scale and PCA X_train, X_val, X_test, scaler, pca = apply_scaling_pca(X_train, X_val, X_test, config) current_n_features = X_train.shape[1] if feature_importances_sum is None: effective_n_features = current_n_features feature_importances_sum = np.zeros(effective_n_features) window_count += 1 total_possible = (n - train_size) // test_size trades, fi = _train_and_backtest_window( X_train, y_train, X_val, y_val, X_test, test_df, config, strategy, current_n_features, window_count - 1, total_possible ) if fi is not None and len(fi) == effective_n_features: feature_importances_sum += fi fi_count += 1 all_trades.extend(trades) if trades: returns = [t["return_pct"] for t in trades] mean_r = np.mean(returns) std_r = np.std(returns) if len(returns) > 1 else 1.0 sharpe = (mean_r / std_r) * np.sqrt(252 / max(1, len(trades))) if std_r > 0 else 0 per_window_sharpe.append(round(sharpe, 3)) else: per_window_sharpe.append(0.0) print(f" Rolling window {window_count}: {len(trades)} trades, sharpe={per_window_sharpe[-1]}") start += test_size if pca is not None and config.get("features", {}).get("use_pca", False): effective_feature_names = [f"PC_{i+1}" for i in range(effective_n_features)] else: effective_feature_names = feature_cols if feature_importances_sum is None: feature_importances_sum = np.zeros(len(effective_feature_names)) return compile_results(all_trades, per_window_sharpe, feature_importances_sum, fi_count, effective_feature_names, df) def _train_and_backtest_window(X_train, y_train, X_val, y_val, X_test, test_df, config, strategy, n_features, w_idx, n_windows): """Train model on one window and backtest. Returns (trades, feature_importances).""" model_type = config.get("model_type", "xgboost") if model_type == "hybrid": return _hybrid_train_and_backtest( X_train, y_train, X_val, y_val, X_test, test_df, config, strategy, n_features ) # Single model path if model_type == "lstm": model = build_model(config, input_size=n_features) else: model = build_model(config) try: if model_type == "lstm": model.fit(X_train, y_train, X_val, y_val) else: model.fit(X_train, y_train) except Exception as e: print(f" Window {w_idx+1}: training failed -- {e}", file=sys.stderr) return [], None # Get predictions try: if model_type == "lstm": seq_len = config.get("hyperparameters", {}).get("lstm_sequence_length", 20) proba = model.predict_proba(X_test)[:, 1] # Align test_df to account for sequence trimming test_df_aligned = test_df.iloc[seq_len:] else: proba = model.predict_proba(X_test)[:, 1] test_df_aligned = test_df except Exception: preds = model.predict(X_test) proba = preds.astype(float) test_df_aligned = test_df # Feature importances fi = _extract_feature_importances(model, n_features) # Backtest trades = backtest(test_df_aligned, proba, strategy) return trades, fi def _hybrid_train_and_backtest(X_train, y_train, X_val, y_val, X_test, test_df, config, strategy, n_features): """Hybrid: LSTM (60%) + XGBoost (40%), only enter when both agree.""" hp = config.get("hyperparameters", {}) seq_len = hp.get("lstm_sequence_length", 20) # Train LSTM lstm_config = {**config, "model_type": "lstm"} lstm_model = build_model(lstm_config, input_size=n_features) try: lstm_model.fit(X_train, y_train, X_val, y_val) lstm_proba_full = lstm_model.predict_proba(X_test)[:, 1] except Exception as e: print(f" Hybrid LSTM failed: {e}", file=sys.stderr) lstm_proba_full = np.full(max(0, len(X_test) - seq_len), 0.5) # Train XGBoost xgb_config = {**config, "model_type": "xgboost"} xgb_model = build_model(xgb_config) try: xgb_model.fit(X_train, y_train) xgb_proba_full = xgb_model.predict_proba(X_test)[:, 1] except Exception as e: print(f" Hybrid XGBoost failed: {e}", file=sys.stderr) xgb_proba_full = np.full(len(X_test), 0.5) # Align: LSTM output is shorter by seq_len xgb_proba = xgb_proba_full[seq_len:] lstm_proba = lstm_proba_full min_len = min(len(lstm_proba), len(xgb_proba)) lstm_proba = lstm_proba[:min_len] xgb_proba = xgb_proba[:min_len] test_df_aligned = test_df.iloc[seq_len:seq_len + min_len] # Combine: 60% LSTM + 40% XGBoost, only when both agree lstm_weight = 0.6 xgb_weight = 0.4 combined_proba = lstm_weight * lstm_proba + xgb_weight * xgb_proba # Both must agree on direction (both > 0.5 or both < 0.5) lstm_bullish = lstm_proba > 0.5 xgb_bullish = xgb_proba > 0.5 agreement = lstm_bullish == xgb_bullish # Zero out signals where models disagree combined_proba[~agreement] = 0.5 # Feature importances from XGBoost fi = _extract_feature_importances(xgb_model, n_features) trades = backtest(test_df_aligned, combined_proba, strategy) return trades, fi def _extract_feature_importances(model, n_features): """Extract normalized feature importances from a model.""" try: if hasattr(model, "feature_importances_"): fi = model.feature_importances_ elif hasattr(model, "get_booster"): fi_dict = model.get_booster().get_score(importance_type="gain") fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(n_features)]) else: return None return fi / (fi.sum() + 1e-10) except Exception: return None # --------------------------------------------------------------------------- # Backtesting # --------------------------------------------------------------------------- def backtest(test_df: pd.DataFrame, proba: np.ndarray, strategy: dict) -> list: """Simulate trades using model predictions. Supports ATR-based dynamic SL/TP.""" entry_threshold = strategy.get("entry_threshold", 0.6) stop_loss_fixed = strategy.get("stop_loss_pct", 2.0) / 100 take_profit_fixed = strategy.get("take_profit_pct", 4.0) / 100 trailing_stop = strategy.get("trailing_stop_pct", 1.5) / 100 exit_type = strategy.get("exit_type", "trailing_stop") min_confidence = strategy.get("min_confidence_to_trade", 0.55) use_dynamic = strategy.get("dynamic_sl_tp", False) atr_sl_mult = strategy.get("atr_sl_multiplier", 1.5) atr_tp_mult = strategy.get("atr_tp_multiplier", 3.0) fee = 0.001 # 0.1% per trade closes = test_df["close"].values highs = test_df["high"].values lows = test_df["low"].values # ATR for dynamic SL/TP if use_dynamic and "ATR_14" in test_df.columns: atr_values = test_df["ATR_14"].values else: atr_values = None trades = [] i = 0 while i < len(closes) - 1: if i >= len(proba): break if proba[i] < min_confidence or proba[i] < entry_threshold: i += 1 continue # Enter trade entry_price = closes[i] confidence = proba[i] # Compute dynamic SL/TP if enabled if use_dynamic and atr_values is not None and not np.isnan(atr_values[i]): atr = atr_values[i] stop_loss = (atr * atr_sl_mult) / entry_price take_profit = (atr * atr_tp_mult) / entry_price else: stop_loss = stop_loss_fixed take_profit = take_profit_fixed # Position sizing based on confidence if strategy.get("position_sizing") == "confidence_scaled": if confidence > 0.8: size_mult = 1.0 elif confidence > 0.65: size_mult = 0.75 else: size_mult = 0.5 else: size_mult = 1.0 peak = entry_price j = i + 1 while j < len(closes): current_high = highs[j] current_low = lows[j] current_close = closes[j] peak = max(peak, current_high) # Check stop loss if (entry_price - current_low) / entry_price >= stop_loss: exit_price = entry_price * (1 - stop_loss) break # Check take profit if (current_high - entry_price) / entry_price >= take_profit: exit_price = entry_price * (1 + take_profit) break # Check trailing stop if exit_type == "trailing_stop" and (peak - current_low) / peak >= trailing_stop: exit_price = peak * (1 - trailing_stop) break j += 1 else: # Exit at end of test period exit_price = closes[-1] raw_return = (exit_price - entry_price) / entry_price net_return = raw_return - 2 * fee # entry + exit fees net_return *= size_mult trades.append({ "entry_idx": i, "exit_idx": j if j < len(closes) else len(closes) - 1, "entry_price": float(entry_price), "exit_price": float(exit_price), "return_pct": float(net_return * 100), "confidence": float(confidence), "size_mult": float(size_mult), "duration": j - i, }) i = j + 1 # Skip to after exit return trades def compile_results(trades: list, per_window_sharpe: list, fi_sum: np.ndarray, fi_count: int, feature_cols: list, df: pd.DataFrame) -> dict: """Compile all results into output JSON.""" if not trades: return { "sharpe_ratio": 0.0, "total_return_pct": 0.0, "max_drawdown_pct": 0.0, "win_rate": 0.0, "trade_count": 0, "profit_factor": 0.0, "avg_trade_duration_candles": 0.0, "feature_importances": {}, "monthly_returns": [], "equity_curve": [], "per_window_sharpe": per_window_sharpe, } returns = [t["return_pct"] for t in trades] wins = [r for r in returns if r > 0] losses = [r for r in returns if r <= 0] total_return = 1.0 equity = [1.0] for r in returns: total_return *= (1 + r / 100) equity.append(total_return) # Max drawdown peak_eq = equity[0] max_dd = 0 for eq in equity: peak_eq = max(peak_eq, eq) dd = (eq - peak_eq) / peak_eq max_dd = min(max_dd, dd) # Sharpe (annualized approximation) mean_r = np.mean(returns) std_r = np.std(returns) if len(returns) > 1 else 1.0 trades_per_year = 252 # approximate sharpe = (mean_r / std_r) * np.sqrt(trades_per_year / max(1, len(returns))) if std_r > 0 else 0 # Profit factor gross_profit = sum(wins) if wins else 0 gross_loss = abs(sum(losses)) if losses else 1 profit_factor = gross_profit / gross_loss if gross_loss > 0 else gross_profit # Feature importances fi_avg = fi_sum / max(fi_count, 1) fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1]) feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]} # Monthly returns (approximate by grouping trades) monthly_returns = [] trades_per_month = max(1, len(trades) // 12) for i in range(0, len(returns), trades_per_month): chunk = returns[i:i + trades_per_month] monthly_returns.append(round(sum(chunk), 2)) # Sample equity curve to ~100 points if len(equity) > 100: step = len(equity) // 100 equity_sampled = [round(equity[i], 4) for i in range(0, len(equity), step)] else: equity_sampled = [round(e, 4) for e in equity] return { "sharpe_ratio": round(sharpe, 3), "total_return_pct": round((total_return - 1) * 100, 2), "max_drawdown_pct": round(max_dd * 100, 2), "win_rate": round(len(wins) / len(returns), 3) if returns else 0, "trade_count": len(trades), "profit_factor": round(profit_factor, 3), "avg_trade_duration_candles": round(np.mean([t["duration"] for t in trades]), 1), "feature_importances": feature_importances, "monthly_returns": monthly_returns, "equity_curve": equity_sampled, "per_window_sharpe": per_window_sharpe, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="BTC ML Trading -- Train & Backtest") parser.add_argument("--config", required=True, help="Path to config JSON") parser.add_argument("--data", required=True, help="Path to OHLCV CSV") parser.add_argument("--output", required=True, help="Path to output results JSON") args = parser.parse_args() # Load config with open(args.config) as f: config = json.load(f) # Load data print(f"Loading data from {args.data}...") df = pd.read_csv(args.data, parse_dates=["timestamp"]) print(f" {len(df)} rows, {df['timestamp'].iloc[0]} -> {df['timestamp'].iloc[-1]}") # Compute features print("Computing features...") df = compute_features(df, config) # Create target print("Creating target labels...") df["target"] = create_target(df, config) # Drop NaN rows feature_cols = [c for c in df.columns if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]] df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True) print(f" {len(df)} rows after dropping NaN, {len(feature_cols)} features") print(f" Target distribution: {df['target'].value_counts().to_dict()}") # Run walk-forward training + backtesting model_type = config.get("model_type", "xgboost") rolling = config.get("training", {}).get("rolling_window", False) print(f"\nModel: {model_type}, Rolling: {rolling}") print("Running validation...") results = walk_forward_train_test(df, feature_cols, config) # Save results with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {args.output}") print(f" Sharpe: {results['sharpe_ratio']}, Return: {results['total_return_pct']}%, " f"Win Rate: {results['win_rate']}, Trades: {results['trade_count']}") if __name__ == "__main__": main()