btc-accumulation-monitor/ml_engine/train_and_backtest.py
BizzleBot 8ff35c1a86 feat: complete BTC ML trading strategy optimizer
Multi-machine optimization loop:
- VPS orchestrator coordinates training and LLM analysis
- Windows PC (RTX 4070 Ti) runs XGBoost/LightGBM/CatBoost with GPU
- Mac Mini runs qwen3.5:27b via Ollama for strategy analysis

Includes 60+ technical features, walk-forward validation,
confidence-scaled position sizing, and automated convergence detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:25:44 +00:00

538 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BTC ML Trading Strategy — Train & Backtest Engine
Self-contained script that runs on the Windows PC with GPU.
Usage:
python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json
"""
import argparse
import json
import sys
import warnings
import numpy as np
import pandas as pd
from datetime import datetime
import ta
from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator
from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator
from ta.volatility import BollingerBands, AverageTrueRange, KeltnerChannel
from ta.volume import OnBalanceVolumeIndicator
warnings.filterwarnings("ignore")
# ---------------------------------------------------------------------------
# Feature Engineering
# ---------------------------------------------------------------------------
def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""Compute 60+ technical features from OHLCV data."""
feat = config.get("features", {})
c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"]
# --- Price SMAs & EMAs ---
for p in [5, 10, 20, 50, 200]:
df[f"SMA_{p}"] = SMAIndicator(c, window=p).sma_indicator()
df[f"price_vs_SMA_{p}"] = c / df[f"SMA_{p}"] - 1
for p in [5, 10, 20, 50]:
df[f"EMA_{p}"] = EMAIndicator(c, window=p).ema_indicator()
# --- Momentum ---
for p in [7, 14, 21]:
df[f"RSI_{p}"] = RSIIndicator(c, window=p).rsi()
macd = MACD(c, window_slow=26, window_fast=12, window_sign=9)
df["MACD_line"] = macd.macd()
df["MACD_signal"] = macd.macd_signal()
df["MACD_hist"] = macd.macd_diff()
stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3)
df["stoch_k"] = stoch.stoch()
df["stoch_d"] = stoch.stoch_signal()
df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r()
df["ROC_10"] = ROCIndicator(c, window=10).roc()
df["CCI_20"] = CCIIndicator(h, l, c, window=20).cci()
# --- Volatility ---
bb = BollingerBands(c, window=20, window_dev=2)
df["BB_upper"] = bb.bollinger_hband()
df["BB_lower"] = bb.bollinger_lband()
df["BB_width"] = (df["BB_upper"] - df["BB_lower"]) / c
df["BB_pctb"] = bb.bollinger_pband()
df["ATR_14"] = AverageTrueRange(h, l, c, window=14).average_true_range()
df["ATR_pct"] = df["ATR_14"] / c
kc = KeltnerChannel(h, l, c, window=20)
df["keltner_upper"] = kc.keltner_channel_hband()
df["keltner_lower"] = kc.keltner_channel_lband()
df["hist_volatility"] = c.pct_change().rolling(20).std() * np.sqrt(252)
# --- Volume ---
if feat.get("use_volume_features", True):
df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume()
df["volume_sma_20"] = v.rolling(20).mean()
df["volume_ratio"] = v / df["volume_sma_20"]
df["volume_momentum"] = v.pct_change(5)
# VWAP approximation (rolling)
tp = (h + l + c) / 3
df["vwap_approx"] = (tp * v).rolling(20).sum() / v.rolling(20).sum()
df["price_vs_vwap"] = c / df["vwap_approx"] - 1
# --- Candle Patterns ---
if feat.get("use_candle_patterns", True):
body = (c - o).abs()
full_range = h - l
df["candle_body_ratio"] = body / full_range.replace(0, np.nan)
df["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range.replace(0, np.nan)
df["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range.replace(0, np.nan)
df["is_bullish"] = (c > o).astype(int)
# Consecutive up/down
df["consecutive_up"] = df["is_bullish"].groupby((df["is_bullish"] != df["is_bullish"].shift()).cumsum()).cumsum()
df["consecutive_down"] = (1 - df["is_bullish"]).groupby(((1 - df["is_bullish"]) != (1 - df["is_bullish"]).shift()).cumsum()).cumsum()
# --- Lag Features ---
if feat.get("use_lag_features", True):
lag_periods = feat.get("lag_periods", [1, 2, 3, 5])
for lag in lag_periods:
df[f"return_lag_{lag}"] = c.pct_change(lag)
df[f"volume_lag_{lag}"] = v.pct_change(lag)
if f"RSI_14" in df.columns:
df[f"RSI_14_lag_{lag}"] = df["RSI_14"].shift(lag)
# --- Lookback period features ---
for p in feat.get("lookback_periods", [3, 5, 10, 20]):
df[f"return_{p}"] = c.pct_change(p)
df[f"volatility_{p}"] = c.pct_change().rolling(p).std()
df[f"high_low_range_{p}"] = (h.rolling(p).max() - l.rolling(p).min()) / c
return df
# ---------------------------------------------------------------------------
# Target Labeling
# ---------------------------------------------------------------------------
def create_target(df: pd.DataFrame, config: dict) -> pd.Series:
"""Create binary target: will price move >= threshold% within horizon?"""
tgt = config.get("target", {})
horizon = tgt.get("horizon_candles", 6)
threshold = tgt.get("threshold_pct", 1.0) / 100.0
direction = tgt.get("direction", "long")
future_max = df["close"].shift(-1).rolling(horizon).max().shift(-horizon + 1)
future_min = df["close"].shift(-1).rolling(horizon).min().shift(-horizon + 1)
if direction == "long":
target = ((future_max / df["close"]) - 1 >= threshold).astype(int)
elif direction == "short":
target = ((df["close"] / future_min) - 1 >= threshold).astype(int)
else: # both
long_signal = ((future_max / df["close"]) - 1 >= threshold).astype(int)
short_signal = ((df["close"] / future_min) - 1 >= threshold).astype(int)
target = long_signal # Simplify: use long for now
target[short_signal == 1] = 1
return target
# ---------------------------------------------------------------------------
# Model Building
# ---------------------------------------------------------------------------
def build_model(config: dict):
"""Build the ML model based on config."""
model_type = config.get("model_type", "xgboost")
hp = config.get("hyperparameters", {})
if model_type == "xgboost":
import xgboost as xgb
# Detect GPU
try:
import torch
gpu_available = torch.cuda.is_available()
except ImportError:
gpu_available = False
params = {
"learning_rate": hp.get("learning_rate", 0.05),
"max_depth": hp.get("max_depth", 6),
"n_estimators": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"colsample_bytree": hp.get("colsample_bytree", 0.8),
"min_child_weight": hp.get("min_child_weight", 5),
"gamma": hp.get("gamma", 0.1),
"reg_alpha": hp.get("reg_alpha", 0.1),
"reg_lambda": hp.get("reg_lambda", 1.0),
"eval_metric": "logloss",
"random_state": 42,
"device": "cuda" if gpu_available else "cpu",
"verbosity": 0,
}
return xgb.XGBClassifier(**params)
elif model_type == "lightgbm":
import lightgbm as lgb
params = {
"learning_rate": hp.get("learning_rate", 0.05),
"max_depth": hp.get("max_depth", 6),
"n_estimators": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"colsample_bytree": hp.get("colsample_bytree", 0.8),
"min_child_samples": hp.get("min_child_weight", 5),
"reg_alpha": hp.get("reg_alpha", 0.1),
"reg_lambda": hp.get("reg_lambda", 1.0),
"random_state": 42,
"verbose": -1,
}
try:
params["device"] = "gpu"
model = lgb.LGBMClassifier(**params)
return model
except Exception:
params["device"] = "cpu"
return lgb.LGBMClassifier(**params)
elif model_type == "catboost":
from catboost import CatBoostClassifier
try:
import torch
gpu_available = torch.cuda.is_available()
except ImportError:
gpu_available = False
params = {
"learning_rate": hp.get("learning_rate", 0.05),
"depth": hp.get("max_depth", 6),
"iterations": hp.get("n_estimators", 500),
"subsample": hp.get("subsample", 0.8),
"l2_leaf_reg": hp.get("reg_lambda", 1.0),
"random_seed": 42,
"verbose": 0,
"task_type": "GPU" if gpu_available else "CPU",
}
return CatBoostClassifier(**params)
elif model_type == "ensemble":
from sklearn.ensemble import VotingClassifier
models = []
for sub_type in ["xgboost", "lightgbm", "catboost"]:
sub_config = {**config, "model_type": sub_type}
m = build_model(sub_config)
models.append((sub_type, m))
return VotingClassifier(estimators=models, voting="soft")
else:
raise ValueError(f"Unknown model_type: {model_type}")
# ---------------------------------------------------------------------------
# Walk-Forward Validation + Backtesting
# ---------------------------------------------------------------------------
def walk_forward_train_test(df: pd.DataFrame, feature_cols: list, config: dict) -> dict:
"""Walk-forward validation with backtesting on each window."""
training_cfg = config.get("training", {})
n_windows = training_cfg.get("walk_forward_windows", 5)
train_pct = training_cfg.get("train_pct", 0.7)
val_pct = training_cfg.get("validation_pct", 0.15)
n = len(df)
window_size = n // n_windows
strategy = config.get("strategy", {})
all_trades = []
per_window_sharpe = []
feature_importances_sum = np.zeros(len(feature_cols))
fi_count = 0
for w in range(n_windows):
start = w * window_size
end = min((w + 1) * window_size + int(window_size * 0.3), n) # overlap for test
if end > n:
end = n
window_data = df.iloc[start:end].copy()
wn = len(window_data)
train_end = int(wn * train_pct)
val_end = int(wn * (train_pct + val_pct))
train_df = window_data.iloc[:train_end]
val_df = window_data.iloc[train_end:val_end]
test_df = window_data.iloc[val_end:]
if len(test_df) < 10 or train_df["target"].nunique() < 2:
continue
X_train = train_df[feature_cols].values
y_train = train_df["target"].values
X_val = val_df[feature_cols].values
y_val = val_df["target"].values
X_test = test_df[feature_cols].values
# Train model
model = build_model(config)
try:
model.fit(X_train, y_train)
except Exception as e:
print(f" Window {w+1}: training failed — {e}", file=sys.stderr)
continue
# Get predictions on test set
try:
proba = model.predict_proba(X_test)[:, 1]
except Exception:
preds = model.predict(X_test)
proba = preds.astype(float)
# Extract feature importances
try:
if hasattr(model, "feature_importances_"):
fi = model.feature_importances_
elif hasattr(model, "get_booster"):
fi_dict = model.get_booster().get_score(importance_type="gain")
fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(len(feature_cols))])
else:
fi = np.zeros(len(feature_cols))
feature_importances_sum += fi / (fi.sum() + 1e-10)
fi_count += 1
except Exception:
pass
# Backtest on test set
trades = backtest(test_df, proba, strategy)
all_trades.extend(trades)
# Window sharpe
if trades:
returns = [t["return_pct"] for t in trades]
mean_r = np.mean(returns)
std_r = np.std(returns) if len(returns) > 1 else 1.0
sharpe = (mean_r / std_r) * np.sqrt(252 / max(1, len(trades))) if std_r > 0 else 0
per_window_sharpe.append(round(sharpe, 3))
else:
per_window_sharpe.append(0.0)
print(f" Window {w+1}/{n_windows}: {len(trades)} trades, sharpe={per_window_sharpe[-1]}")
return compile_results(all_trades, per_window_sharpe, feature_importances_sum, fi_count, feature_cols, df)
def backtest(test_df: pd.DataFrame, proba: np.ndarray, strategy: dict) -> list:
"""Simulate trades using model predictions."""
entry_threshold = strategy.get("entry_threshold", 0.6)
stop_loss = strategy.get("stop_loss_pct", 2.0) / 100
take_profit = strategy.get("take_profit_pct", 4.0) / 100
trailing_stop = strategy.get("trailing_stop_pct", 1.5) / 100
exit_type = strategy.get("exit_type", "trailing_stop")
min_confidence = strategy.get("min_confidence_to_trade", 0.55)
fee = 0.001 # 0.1% per trade
closes = test_df["close"].values
highs = test_df["high"].values
lows = test_df["low"].values
trades = []
i = 0
while i < len(closes) - 1:
if proba[i] < min_confidence or proba[i] < entry_threshold:
i += 1
continue
# Enter trade
entry_price = closes[i]
confidence = proba[i]
# Position sizing based on confidence
if strategy.get("position_sizing") == "confidence_scaled":
if confidence > 0.8:
size_mult = 1.0
elif confidence > 0.65:
size_mult = 0.75
else:
size_mult = 0.5
else:
size_mult = 1.0
peak = entry_price
j = i + 1
while j < len(closes):
current_high = highs[j]
current_low = lows[j]
current_close = closes[j]
peak = max(peak, current_high)
# Check stop loss
if (entry_price - current_low) / entry_price >= stop_loss:
exit_price = entry_price * (1 - stop_loss)
break
# Check take profit
if (current_high - entry_price) / entry_price >= take_profit:
exit_price = entry_price * (1 + take_profit)
break
# Check trailing stop
if exit_type == "trailing_stop" and (peak - current_low) / peak >= trailing_stop:
exit_price = peak * (1 - trailing_stop)
break
j += 1
else:
# Exit at end of test period
exit_price = closes[-1]
raw_return = (exit_price - entry_price) / entry_price
net_return = raw_return - 2 * fee # entry + exit fees
net_return *= size_mult
trades.append({
"entry_idx": i,
"exit_idx": j if j < len(closes) else len(closes) - 1,
"entry_price": float(entry_price),
"exit_price": float(exit_price),
"return_pct": float(net_return * 100),
"confidence": float(confidence),
"size_mult": float(size_mult),
"duration": j - i,
})
i = j + 1 # Skip to after exit
return trades
def compile_results(trades: list, per_window_sharpe: list,
fi_sum: np.ndarray, fi_count: int,
feature_cols: list, df: pd.DataFrame) -> dict:
"""Compile all results into output JSON."""
if not trades:
return {
"sharpe_ratio": 0.0,
"total_return_pct": 0.0,
"max_drawdown_pct": 0.0,
"win_rate": 0.0,
"trade_count": 0,
"profit_factor": 0.0,
"avg_trade_duration_candles": 0.0,
"feature_importances": {},
"monthly_returns": [],
"equity_curve": [],
"per_window_sharpe": per_window_sharpe,
}
returns = [t["return_pct"] for t in trades]
wins = [r for r in returns if r > 0]
losses = [r for r in returns if r <= 0]
total_return = 1.0
equity = [1.0]
for r in returns:
total_return *= (1 + r / 100)
equity.append(total_return)
# Max drawdown
peak_eq = equity[0]
max_dd = 0
for eq in equity:
peak_eq = max(peak_eq, eq)
dd = (eq - peak_eq) / peak_eq
max_dd = min(max_dd, dd)
# Sharpe (annualized approximation)
mean_r = np.mean(returns)
std_r = np.std(returns) if len(returns) > 1 else 1.0
trades_per_year = 252 # approximate
sharpe = (mean_r / std_r) * np.sqrt(trades_per_year / max(1, len(returns))) if std_r > 0 else 0
# Profit factor
gross_profit = sum(wins) if wins else 0
gross_loss = abs(sum(losses)) if losses else 1
profit_factor = gross_profit / gross_loss if gross_loss > 0 else gross_profit
# Feature importances
fi_avg = fi_sum / max(fi_count, 1)
fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1])
feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]}
# Monthly returns (approximate by grouping trades)
monthly_returns = []
trades_per_month = max(1, len(trades) // 12)
for i in range(0, len(returns), trades_per_month):
chunk = returns[i:i + trades_per_month]
monthly_returns.append(round(sum(chunk), 2))
# Sample equity curve to ~100 points
if len(equity) > 100:
step = len(equity) // 100
equity_sampled = [round(equity[i], 4) for i in range(0, len(equity), step)]
else:
equity_sampled = [round(e, 4) for e in equity]
return {
"sharpe_ratio": round(sharpe, 3),
"total_return_pct": round((total_return - 1) * 100, 2),
"max_drawdown_pct": round(max_dd * 100, 2),
"win_rate": round(len(wins) / len(returns), 3) if returns else 0,
"trade_count": len(trades),
"profit_factor": round(profit_factor, 3),
"avg_trade_duration_candles": round(np.mean([t["duration"] for t in trades]), 1),
"feature_importances": feature_importances,
"monthly_returns": monthly_returns,
"equity_curve": equity_sampled,
"per_window_sharpe": per_window_sharpe,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="BTC ML Trading — Train & Backtest")
parser.add_argument("--config", required=True, help="Path to config JSON")
parser.add_argument("--data", required=True, help="Path to OHLCV CSV")
parser.add_argument("--output", required=True, help="Path to output results JSON")
args = parser.parse_args()
# Load config
with open(args.config) as f:
config = json.load(f)
# Load data
print(f"Loading data from {args.data}...")
df = pd.read_csv(args.data, parse_dates=["timestamp"])
print(f" {len(df)} rows, {df['timestamp'].iloc[0]}{df['timestamp'].iloc[-1]}")
# Compute features
print("Computing features...")
df = compute_features(df, config)
# Create target
print("Creating target labels...")
df["target"] = create_target(df, config)
# Drop NaN rows
feature_cols = [c for c in df.columns if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]]
df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True)
print(f" {len(df)} rows after dropping NaN, {len(feature_cols)} features")
print(f" Target distribution: {df['target'].value_counts().to_dict()}")
# Run walk-forward training + backtesting
print("\nRunning walk-forward validation...")
results = walk_forward_train_test(df, feature_cols, config)
# Save results
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {args.output}")
print(f" Sharpe: {results['sharpe_ratio']}, Return: {results['total_return_pct']}%, "
f"Win Rate: {results['win_rate']}, Trades: {results['trade_count']}")
if __name__ == "__main__":
main()