#!/usr/bin/env python3 """ BTC ML Trading Strategy -- Train & Backtest Engine Self-contained script that runs on the Windows PC with GPU. Usage: python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json """ import argparse import json import sys import warnings import numpy as np import pandas as pd from datetime import datetime import ta from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator from ta.volatility import BollingerBands, AverageTrueRange, KeltnerChannel from ta.volume import OnBalanceVolumeIndicator warnings.filterwarnings("ignore") # --------------------------------------------------------------------------- # Feature Engineering # --------------------------------------------------------------------------- def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame: """Compute 60+ technical features from OHLCV data.""" feat = config.get("features", {}) c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"] # --- Price SMAs & EMAs --- for p in [5, 10, 20, 50, 200]: df[f"SMA_{p}"] = SMAIndicator(c, window=p).sma_indicator() df[f"price_vs_SMA_{p}"] = c / df[f"SMA_{p}"] - 1 for p in [5, 10, 20, 50]: df[f"EMA_{p}"] = EMAIndicator(c, window=p).ema_indicator() # --- Momentum --- for p in [7, 14, 21]: df[f"RSI_{p}"] = RSIIndicator(c, window=p).rsi() macd = MACD(c, window_slow=26, window_fast=12, window_sign=9) df["MACD_line"] = macd.macd() df["MACD_signal"] = macd.macd_signal() df["MACD_hist"] = macd.macd_diff() stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3) df["stoch_k"] = stoch.stoch() df["stoch_d"] = stoch.stoch_signal() df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r() df["ROC_10"] = ROCIndicator(c, window=10).roc() df["CCI_20"] = CCIIndicator(h, l, c, window=20).cci() # --- Volatility --- bb = BollingerBands(c, window=20, window_dev=2) df["BB_upper"] = bb.bollinger_hband() df["BB_lower"] = bb.bollinger_lband() df["BB_width"] = (df["BB_upper"] - df["BB_lower"]) / c df["BB_pctb"] = bb.bollinger_pband() df["ATR_14"] = AverageTrueRange(h, l, c, window=14).average_true_range() df["ATR_pct"] = df["ATR_14"] / c kc = KeltnerChannel(h, l, c, window=20) df["keltner_upper"] = kc.keltner_channel_hband() df["keltner_lower"] = kc.keltner_channel_lband() df["hist_volatility"] = c.pct_change().rolling(20).std() * np.sqrt(252) # --- Volume --- if feat.get("use_volume_features", True): df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume() df["volume_sma_20"] = v.rolling(20).mean() df["volume_ratio"] = v / df["volume_sma_20"] df["volume_momentum"] = v.pct_change(5) # VWAP approximation (rolling) tp = (h + l + c) / 3 df["vwap_approx"] = (tp * v).rolling(20).sum() / v.rolling(20).sum() df["price_vs_vwap"] = c / df["vwap_approx"] - 1 # --- Candle Patterns --- if feat.get("use_candle_patterns", True): body = (c - o).abs() full_range = h - l df["candle_body_ratio"] = body / full_range.replace(0, np.nan) df["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range.replace(0, np.nan) df["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range.replace(0, np.nan) df["is_bullish"] = (c > o).astype(int) # Consecutive up/down df["consecutive_up"] = df["is_bullish"].groupby((df["is_bullish"] != df["is_bullish"].shift()).cumsum()).cumsum() df["consecutive_down"] = (1 - df["is_bullish"]).groupby(((1 - df["is_bullish"]) != (1 - df["is_bullish"]).shift()).cumsum()).cumsum() # --- Lag Features --- if feat.get("use_lag_features", True): lag_periods = feat.get("lag_periods", [1, 2, 3, 5]) for lag in lag_periods: df[f"return_lag_{lag}"] = c.pct_change(lag) df[f"volume_lag_{lag}"] = v.pct_change(lag) if f"RSI_14" in df.columns: df[f"RSI_14_lag_{lag}"] = df["RSI_14"].shift(lag) # --- Lookback period features --- for p in feat.get("lookback_periods", [3, 5, 10, 20]): df[f"return_{p}"] = c.pct_change(p) df[f"volatility_{p}"] = c.pct_change().rolling(p).std() df[f"high_low_range_{p}"] = (h.rolling(p).max() - l.rolling(p).min()) / c return df # --------------------------------------------------------------------------- # Target Labeling # --------------------------------------------------------------------------- def create_target(df: pd.DataFrame, config: dict) -> pd.Series: """Create binary target: will price move >= threshold% within horizon?""" tgt = config.get("target", {}) horizon = tgt.get("horizon_candles", 6) threshold = tgt.get("threshold_pct", 1.0) / 100.0 direction = tgt.get("direction", "long") future_max = df["close"].shift(-1).rolling(horizon).max().shift(-horizon + 1) future_min = df["close"].shift(-1).rolling(horizon).min().shift(-horizon + 1) if direction == "long": target = ((future_max / df["close"]) - 1 >= threshold).astype(int) elif direction == "short": target = ((df["close"] / future_min) - 1 >= threshold).astype(int) else: # both long_signal = ((future_max / df["close"]) - 1 >= threshold).astype(int) short_signal = ((df["close"] / future_min) - 1 >= threshold).astype(int) target = long_signal # Simplify: use long for now target[short_signal == 1] = 1 return target # --------------------------------------------------------------------------- # Model Building # --------------------------------------------------------------------------- def build_model(config: dict): """Build the ML model based on config.""" model_type = config.get("model_type", "xgboost") hp = config.get("hyperparameters", {}) if model_type == "xgboost": import xgboost as xgb # Detect GPU try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.05), "max_depth": hp.get("max_depth", 6), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_weight": hp.get("min_child_weight", 5), "gamma": hp.get("gamma", 0.1), "reg_alpha": hp.get("reg_alpha", 0.1), "reg_lambda": hp.get("reg_lambda", 1.0), "eval_metric": "logloss", "random_state": 42, "device": "cuda" if gpu_available else "cpu", "verbosity": 0, } return xgb.XGBClassifier(**params) elif model_type == "lightgbm": import lightgbm as lgb params = { "learning_rate": hp.get("learning_rate", 0.05), "max_depth": hp.get("max_depth", 6), "n_estimators": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "colsample_bytree": hp.get("colsample_bytree", 0.8), "min_child_samples": hp.get("min_child_weight", 5), "reg_alpha": hp.get("reg_alpha", 0.1), "reg_lambda": hp.get("reg_lambda", 1.0), "random_state": 42, "verbose": -1, } try: params["device"] = "gpu" model = lgb.LGBMClassifier(**params) return model except Exception: params["device"] = "cpu" return lgb.LGBMClassifier(**params) elif model_type == "catboost": from catboost import CatBoostClassifier try: import torch gpu_available = torch.cuda.is_available() except ImportError: gpu_available = False params = { "learning_rate": hp.get("learning_rate", 0.05), "depth": hp.get("max_depth", 6), "iterations": hp.get("n_estimators", 500), "subsample": hp.get("subsample", 0.8), "l2_leaf_reg": hp.get("reg_lambda", 1.0), "random_seed": 42, "verbose": 0, "task_type": "GPU" if gpu_available else "CPU", } return CatBoostClassifier(**params) elif model_type == "ensemble": from sklearn.ensemble import VotingClassifier models = [] for sub_type in ["xgboost", "lightgbm", "catboost"]: sub_config = {**config, "model_type": sub_type} m = build_model(sub_config) models.append((sub_type, m)) return VotingClassifier(estimators=models, voting="soft") else: raise ValueError(f"Unknown model_type: {model_type}") # --------------------------------------------------------------------------- # Walk-Forward Validation + Backtesting # --------------------------------------------------------------------------- def walk_forward_train_test(df: pd.DataFrame, feature_cols: list, config: dict) -> dict: """Walk-forward validation with backtesting on each window.""" training_cfg = config.get("training", {}) n_windows = training_cfg.get("walk_forward_windows", 5) train_pct = training_cfg.get("train_pct", 0.7) val_pct = training_cfg.get("validation_pct", 0.15) n = len(df) window_size = n // n_windows strategy = config.get("strategy", {}) all_trades = [] per_window_sharpe = [] feature_importances_sum = np.zeros(len(feature_cols)) fi_count = 0 for w in range(n_windows): start = w * window_size end = min((w + 1) * window_size + int(window_size * 0.3), n) # overlap for test if end > n: end = n window_data = df.iloc[start:end].copy() wn = len(window_data) train_end = int(wn * train_pct) val_end = int(wn * (train_pct + val_pct)) train_df = window_data.iloc[:train_end] val_df = window_data.iloc[train_end:val_end] test_df = window_data.iloc[val_end:] if len(test_df) < 10 or train_df["target"].nunique() < 2: continue X_train = train_df[feature_cols].values y_train = train_df["target"].values X_val = val_df[feature_cols].values y_val = val_df["target"].values X_test = test_df[feature_cols].values # Train model model = build_model(config) try: model.fit(X_train, y_train) except Exception as e: print(f" Window {w+1}: training failed -- {e}", file=sys.stderr) continue # Get predictions on test set try: proba = model.predict_proba(X_test)[:, 1] except Exception: preds = model.predict(X_test) proba = preds.astype(float) # Extract feature importances try: if hasattr(model, "feature_importances_"): fi = model.feature_importances_ elif hasattr(model, "get_booster"): fi_dict = model.get_booster().get_score(importance_type="gain") fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(len(feature_cols))]) else: fi = np.zeros(len(feature_cols)) feature_importances_sum += fi / (fi.sum() + 1e-10) fi_count += 1 except Exception: pass # Backtest on test set trades = backtest(test_df, proba, strategy) all_trades.extend(trades) # Window sharpe if trades: returns = [t["return_pct"] for t in trades] mean_r = np.mean(returns) std_r = np.std(returns) if len(returns) > 1 else 1.0 sharpe = (mean_r / std_r) * np.sqrt(252 / max(1, len(trades))) if std_r > 0 else 0 per_window_sharpe.append(round(sharpe, 3)) else: per_window_sharpe.append(0.0) print(f" Window {w+1}/{n_windows}: {len(trades)} trades, sharpe={per_window_sharpe[-1]}") return compile_results(all_trades, per_window_sharpe, feature_importances_sum, fi_count, feature_cols, df) def backtest(test_df: pd.DataFrame, proba: np.ndarray, strategy: dict) -> list: """Simulate trades using model predictions.""" entry_threshold = strategy.get("entry_threshold", 0.6) stop_loss = strategy.get("stop_loss_pct", 2.0) / 100 take_profit = strategy.get("take_profit_pct", 4.0) / 100 trailing_stop = strategy.get("trailing_stop_pct", 1.5) / 100 exit_type = strategy.get("exit_type", "trailing_stop") min_confidence = strategy.get("min_confidence_to_trade", 0.55) fee = 0.001 # 0.1% per trade closes = test_df["close"].values highs = test_df["high"].values lows = test_df["low"].values trades = [] i = 0 while i < len(closes) - 1: if proba[i] < min_confidence or proba[i] < entry_threshold: i += 1 continue # Enter trade entry_price = closes[i] confidence = proba[i] # Position sizing based on confidence if strategy.get("position_sizing") == "confidence_scaled": if confidence > 0.8: size_mult = 1.0 elif confidence > 0.65: size_mult = 0.75 else: size_mult = 0.5 else: size_mult = 1.0 peak = entry_price j = i + 1 while j < len(closes): current_high = highs[j] current_low = lows[j] current_close = closes[j] peak = max(peak, current_high) # Check stop loss if (entry_price - current_low) / entry_price >= stop_loss: exit_price = entry_price * (1 - stop_loss) break # Check take profit if (current_high - entry_price) / entry_price >= take_profit: exit_price = entry_price * (1 + take_profit) break # Check trailing stop if exit_type == "trailing_stop" and (peak - current_low) / peak >= trailing_stop: exit_price = peak * (1 - trailing_stop) break j += 1 else: # Exit at end of test period exit_price = closes[-1] raw_return = (exit_price - entry_price) / entry_price net_return = raw_return - 2 * fee # entry + exit fees net_return *= size_mult trades.append({ "entry_idx": i, "exit_idx": j if j < len(closes) else len(closes) - 1, "entry_price": float(entry_price), "exit_price": float(exit_price), "return_pct": float(net_return * 100), "confidence": float(confidence), "size_mult": float(size_mult), "duration": j - i, }) i = j + 1 # Skip to after exit return trades def compile_results(trades: list, per_window_sharpe: list, fi_sum: np.ndarray, fi_count: int, feature_cols: list, df: pd.DataFrame) -> dict: """Compile all results into output JSON.""" if not trades: return { "sharpe_ratio": 0.0, "total_return_pct": 0.0, "max_drawdown_pct": 0.0, "win_rate": 0.0, "trade_count": 0, "profit_factor": 0.0, "avg_trade_duration_candles": 0.0, "feature_importances": {}, "monthly_returns": [], "equity_curve": [], "per_window_sharpe": per_window_sharpe, } returns = [t["return_pct"] for t in trades] wins = [r for r in returns if r > 0] losses = [r for r in returns if r <= 0] total_return = 1.0 equity = [1.0] for r in returns: total_return *= (1 + r / 100) equity.append(total_return) # Max drawdown peak_eq = equity[0] max_dd = 0 for eq in equity: peak_eq = max(peak_eq, eq) dd = (eq - peak_eq) / peak_eq max_dd = min(max_dd, dd) # Sharpe (annualized approximation) mean_r = np.mean(returns) std_r = np.std(returns) if len(returns) > 1 else 1.0 trades_per_year = 252 # approximate sharpe = (mean_r / std_r) * np.sqrt(trades_per_year / max(1, len(returns))) if std_r > 0 else 0 # Profit factor gross_profit = sum(wins) if wins else 0 gross_loss = abs(sum(losses)) if losses else 1 profit_factor = gross_profit / gross_loss if gross_loss > 0 else gross_profit # Feature importances fi_avg = fi_sum / max(fi_count, 1) fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1]) feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]} # Monthly returns (approximate by grouping trades) monthly_returns = [] trades_per_month = max(1, len(trades) // 12) for i in range(0, len(returns), trades_per_month): chunk = returns[i:i + trades_per_month] monthly_returns.append(round(sum(chunk), 2)) # Sample equity curve to ~100 points if len(equity) > 100: step = len(equity) // 100 equity_sampled = [round(equity[i], 4) for i in range(0, len(equity), step)] else: equity_sampled = [round(e, 4) for e in equity] return { "sharpe_ratio": round(sharpe, 3), "total_return_pct": round((total_return - 1) * 100, 2), "max_drawdown_pct": round(max_dd * 100, 2), "win_rate": round(len(wins) / len(returns), 3) if returns else 0, "trade_count": len(trades), "profit_factor": round(profit_factor, 3), "avg_trade_duration_candles": round(np.mean([t["duration"] for t in trades]), 1), "feature_importances": feature_importances, "monthly_returns": monthly_returns, "equity_curve": equity_sampled, "per_window_sharpe": per_window_sharpe, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="BTC ML Trading -- Train & Backtest") parser.add_argument("--config", required=True, help="Path to config JSON") parser.add_argument("--data", required=True, help="Path to OHLCV CSV") parser.add_argument("--output", required=True, help="Path to output results JSON") args = parser.parse_args() # Load config with open(args.config) as f: config = json.load(f) # Load data print(f"Loading data from {args.data}...") df = pd.read_csv(args.data, parse_dates=["timestamp"]) print(f" {len(df)} rows, {df['timestamp'].iloc[0]} -> {df['timestamp'].iloc[-1]}") # Compute features print("Computing features...") df = compute_features(df, config) # Create target print("Creating target labels...") df["target"] = create_target(df, config) # Drop NaN rows feature_cols = [c for c in df.columns if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]] df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True) print(f" {len(df)} rows after dropping NaN, {len(feature_cols)} features") print(f" Target distribution: {df['target'].value_counts().to_dict()}") # Run walk-forward training + backtesting print("\nRunning walk-forward validation...") results = walk_forward_train_test(df, feature_cols, config) # Save results with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to {args.output}") print(f" Sharpe: {results['sharpe_ratio']}, Return: {results['total_return_pct']}%, " f"Win Rate: {results['win_rate']}, Trades: {results['trade_count']}") if __name__ == "__main__": main()