Multi-machine optimization loop: - VPS orchestrator coordinates training and LLM analysis - Windows PC (RTX 4070 Ti) runs XGBoost/LightGBM/CatBoost with GPU - Mac Mini runs qwen3.5:27b via Ollama for strategy analysis Includes 60+ technical features, walk-forward validation, confidence-scaled position sizing, and automated convergence detection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
538 lines
20 KiB
Python
Executable File
538 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BTC ML Trading Strategy — Train & Backtest Engine
|
|
Self-contained script that runs on the Windows PC with GPU.
|
|
|
|
Usage:
|
|
python train_and_backtest.py --config config.json --data btc_4h.csv --output results.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import warnings
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
import ta
|
|
from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator
|
|
from ta.trend import MACD, CCIIndicator, SMAIndicator, EMAIndicator
|
|
from ta.volatility import BollingerBands, AverageTrueRange, KeltnerChannel
|
|
from ta.volume import OnBalanceVolumeIndicator
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature Engineering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame:
|
|
"""Compute 60+ technical features from OHLCV data."""
|
|
feat = config.get("features", {})
|
|
c, h, l, o, v = df["close"], df["high"], df["low"], df["open"], df["volume"]
|
|
|
|
# --- Price SMAs & EMAs ---
|
|
for p in [5, 10, 20, 50, 200]:
|
|
df[f"SMA_{p}"] = SMAIndicator(c, window=p).sma_indicator()
|
|
df[f"price_vs_SMA_{p}"] = c / df[f"SMA_{p}"] - 1
|
|
for p in [5, 10, 20, 50]:
|
|
df[f"EMA_{p}"] = EMAIndicator(c, window=p).ema_indicator()
|
|
|
|
# --- Momentum ---
|
|
for p in [7, 14, 21]:
|
|
df[f"RSI_{p}"] = RSIIndicator(c, window=p).rsi()
|
|
|
|
macd = MACD(c, window_slow=26, window_fast=12, window_sign=9)
|
|
df["MACD_line"] = macd.macd()
|
|
df["MACD_signal"] = macd.macd_signal()
|
|
df["MACD_hist"] = macd.macd_diff()
|
|
|
|
stoch = StochasticOscillator(h, l, c, window=14, smooth_window=3)
|
|
df["stoch_k"] = stoch.stoch()
|
|
df["stoch_d"] = stoch.stoch_signal()
|
|
|
|
df["williams_r"] = WilliamsRIndicator(h, l, c, lbp=14).williams_r()
|
|
df["ROC_10"] = ROCIndicator(c, window=10).roc()
|
|
df["CCI_20"] = CCIIndicator(h, l, c, window=20).cci()
|
|
|
|
# --- Volatility ---
|
|
bb = BollingerBands(c, window=20, window_dev=2)
|
|
df["BB_upper"] = bb.bollinger_hband()
|
|
df["BB_lower"] = bb.bollinger_lband()
|
|
df["BB_width"] = (df["BB_upper"] - df["BB_lower"]) / c
|
|
df["BB_pctb"] = bb.bollinger_pband()
|
|
|
|
df["ATR_14"] = AverageTrueRange(h, l, c, window=14).average_true_range()
|
|
df["ATR_pct"] = df["ATR_14"] / c
|
|
|
|
kc = KeltnerChannel(h, l, c, window=20)
|
|
df["keltner_upper"] = kc.keltner_channel_hband()
|
|
df["keltner_lower"] = kc.keltner_channel_lband()
|
|
|
|
df["hist_volatility"] = c.pct_change().rolling(20).std() * np.sqrt(252)
|
|
|
|
# --- Volume ---
|
|
if feat.get("use_volume_features", True):
|
|
df["OBV"] = OnBalanceVolumeIndicator(c, v).on_balance_volume()
|
|
df["volume_sma_20"] = v.rolling(20).mean()
|
|
df["volume_ratio"] = v / df["volume_sma_20"]
|
|
df["volume_momentum"] = v.pct_change(5)
|
|
# VWAP approximation (rolling)
|
|
tp = (h + l + c) / 3
|
|
df["vwap_approx"] = (tp * v).rolling(20).sum() / v.rolling(20).sum()
|
|
df["price_vs_vwap"] = c / df["vwap_approx"] - 1
|
|
|
|
# --- Candle Patterns ---
|
|
if feat.get("use_candle_patterns", True):
|
|
body = (c - o).abs()
|
|
full_range = h - l
|
|
df["candle_body_ratio"] = body / full_range.replace(0, np.nan)
|
|
df["upper_wick_ratio"] = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range.replace(0, np.nan)
|
|
df["lower_wick_ratio"] = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range.replace(0, np.nan)
|
|
df["is_bullish"] = (c > o).astype(int)
|
|
# Consecutive up/down
|
|
df["consecutive_up"] = df["is_bullish"].groupby((df["is_bullish"] != df["is_bullish"].shift()).cumsum()).cumsum()
|
|
df["consecutive_down"] = (1 - df["is_bullish"]).groupby(((1 - df["is_bullish"]) != (1 - df["is_bullish"]).shift()).cumsum()).cumsum()
|
|
|
|
# --- Lag Features ---
|
|
if feat.get("use_lag_features", True):
|
|
lag_periods = feat.get("lag_periods", [1, 2, 3, 5])
|
|
for lag in lag_periods:
|
|
df[f"return_lag_{lag}"] = c.pct_change(lag)
|
|
df[f"volume_lag_{lag}"] = v.pct_change(lag)
|
|
if f"RSI_14" in df.columns:
|
|
df[f"RSI_14_lag_{lag}"] = df["RSI_14"].shift(lag)
|
|
|
|
# --- Lookback period features ---
|
|
for p in feat.get("lookback_periods", [3, 5, 10, 20]):
|
|
df[f"return_{p}"] = c.pct_change(p)
|
|
df[f"volatility_{p}"] = c.pct_change().rolling(p).std()
|
|
df[f"high_low_range_{p}"] = (h.rolling(p).max() - l.rolling(p).min()) / c
|
|
|
|
return df
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Target Labeling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_target(df: pd.DataFrame, config: dict) -> pd.Series:
|
|
"""Create binary target: will price move >= threshold% within horizon?"""
|
|
tgt = config.get("target", {})
|
|
horizon = tgt.get("horizon_candles", 6)
|
|
threshold = tgt.get("threshold_pct", 1.0) / 100.0
|
|
direction = tgt.get("direction", "long")
|
|
|
|
future_max = df["close"].shift(-1).rolling(horizon).max().shift(-horizon + 1)
|
|
future_min = df["close"].shift(-1).rolling(horizon).min().shift(-horizon + 1)
|
|
|
|
if direction == "long":
|
|
target = ((future_max / df["close"]) - 1 >= threshold).astype(int)
|
|
elif direction == "short":
|
|
target = ((df["close"] / future_min) - 1 >= threshold).astype(int)
|
|
else: # both
|
|
long_signal = ((future_max / df["close"]) - 1 >= threshold).astype(int)
|
|
short_signal = ((df["close"] / future_min) - 1 >= threshold).astype(int)
|
|
target = long_signal # Simplify: use long for now
|
|
target[short_signal == 1] = 1
|
|
|
|
return target
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Model Building
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_model(config: dict):
|
|
"""Build the ML model based on config."""
|
|
model_type = config.get("model_type", "xgboost")
|
|
hp = config.get("hyperparameters", {})
|
|
|
|
if model_type == "xgboost":
|
|
import xgboost as xgb
|
|
# Detect GPU
|
|
try:
|
|
import torch
|
|
gpu_available = torch.cuda.is_available()
|
|
except ImportError:
|
|
gpu_available = False
|
|
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.05),
|
|
"max_depth": hp.get("max_depth", 6),
|
|
"n_estimators": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"colsample_bytree": hp.get("colsample_bytree", 0.8),
|
|
"min_child_weight": hp.get("min_child_weight", 5),
|
|
"gamma": hp.get("gamma", 0.1),
|
|
"reg_alpha": hp.get("reg_alpha", 0.1),
|
|
"reg_lambda": hp.get("reg_lambda", 1.0),
|
|
"eval_metric": "logloss",
|
|
"random_state": 42,
|
|
"device": "cuda" if gpu_available else "cpu",
|
|
"verbosity": 0,
|
|
}
|
|
return xgb.XGBClassifier(**params)
|
|
|
|
elif model_type == "lightgbm":
|
|
import lightgbm as lgb
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.05),
|
|
"max_depth": hp.get("max_depth", 6),
|
|
"n_estimators": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"colsample_bytree": hp.get("colsample_bytree", 0.8),
|
|
"min_child_samples": hp.get("min_child_weight", 5),
|
|
"reg_alpha": hp.get("reg_alpha", 0.1),
|
|
"reg_lambda": hp.get("reg_lambda", 1.0),
|
|
"random_state": 42,
|
|
"verbose": -1,
|
|
}
|
|
try:
|
|
params["device"] = "gpu"
|
|
model = lgb.LGBMClassifier(**params)
|
|
return model
|
|
except Exception:
|
|
params["device"] = "cpu"
|
|
return lgb.LGBMClassifier(**params)
|
|
|
|
elif model_type == "catboost":
|
|
from catboost import CatBoostClassifier
|
|
try:
|
|
import torch
|
|
gpu_available = torch.cuda.is_available()
|
|
except ImportError:
|
|
gpu_available = False
|
|
|
|
params = {
|
|
"learning_rate": hp.get("learning_rate", 0.05),
|
|
"depth": hp.get("max_depth", 6),
|
|
"iterations": hp.get("n_estimators", 500),
|
|
"subsample": hp.get("subsample", 0.8),
|
|
"l2_leaf_reg": hp.get("reg_lambda", 1.0),
|
|
"random_seed": 42,
|
|
"verbose": 0,
|
|
"task_type": "GPU" if gpu_available else "CPU",
|
|
}
|
|
return CatBoostClassifier(**params)
|
|
|
|
elif model_type == "ensemble":
|
|
from sklearn.ensemble import VotingClassifier
|
|
models = []
|
|
for sub_type in ["xgboost", "lightgbm", "catboost"]:
|
|
sub_config = {**config, "model_type": sub_type}
|
|
m = build_model(sub_config)
|
|
models.append((sub_type, m))
|
|
return VotingClassifier(estimators=models, voting="soft")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown model_type: {model_type}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Walk-Forward Validation + Backtesting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def walk_forward_train_test(df: pd.DataFrame, feature_cols: list, config: dict) -> dict:
|
|
"""Walk-forward validation with backtesting on each window."""
|
|
training_cfg = config.get("training", {})
|
|
n_windows = training_cfg.get("walk_forward_windows", 5)
|
|
train_pct = training_cfg.get("train_pct", 0.7)
|
|
val_pct = training_cfg.get("validation_pct", 0.15)
|
|
|
|
n = len(df)
|
|
window_size = n // n_windows
|
|
strategy = config.get("strategy", {})
|
|
|
|
all_trades = []
|
|
per_window_sharpe = []
|
|
feature_importances_sum = np.zeros(len(feature_cols))
|
|
fi_count = 0
|
|
|
|
for w in range(n_windows):
|
|
start = w * window_size
|
|
end = min((w + 1) * window_size + int(window_size * 0.3), n) # overlap for test
|
|
if end > n:
|
|
end = n
|
|
|
|
window_data = df.iloc[start:end].copy()
|
|
wn = len(window_data)
|
|
|
|
train_end = int(wn * train_pct)
|
|
val_end = int(wn * (train_pct + val_pct))
|
|
|
|
train_df = window_data.iloc[:train_end]
|
|
val_df = window_data.iloc[train_end:val_end]
|
|
test_df = window_data.iloc[val_end:]
|
|
|
|
if len(test_df) < 10 or train_df["target"].nunique() < 2:
|
|
continue
|
|
|
|
X_train = train_df[feature_cols].values
|
|
y_train = train_df["target"].values
|
|
X_val = val_df[feature_cols].values
|
|
y_val = val_df["target"].values
|
|
X_test = test_df[feature_cols].values
|
|
|
|
# Train model
|
|
model = build_model(config)
|
|
try:
|
|
model.fit(X_train, y_train)
|
|
except Exception as e:
|
|
print(f" Window {w+1}: training failed — {e}", file=sys.stderr)
|
|
continue
|
|
|
|
# Get predictions on test set
|
|
try:
|
|
proba = model.predict_proba(X_test)[:, 1]
|
|
except Exception:
|
|
preds = model.predict(X_test)
|
|
proba = preds.astype(float)
|
|
|
|
# Extract feature importances
|
|
try:
|
|
if hasattr(model, "feature_importances_"):
|
|
fi = model.feature_importances_
|
|
elif hasattr(model, "get_booster"):
|
|
fi_dict = model.get_booster().get_score(importance_type="gain")
|
|
fi = np.array([fi_dict.get(f"f{i}", 0) for i in range(len(feature_cols))])
|
|
else:
|
|
fi = np.zeros(len(feature_cols))
|
|
feature_importances_sum += fi / (fi.sum() + 1e-10)
|
|
fi_count += 1
|
|
except Exception:
|
|
pass
|
|
|
|
# Backtest on test set
|
|
trades = backtest(test_df, proba, strategy)
|
|
all_trades.extend(trades)
|
|
|
|
# Window sharpe
|
|
if trades:
|
|
returns = [t["return_pct"] for t in trades]
|
|
mean_r = np.mean(returns)
|
|
std_r = np.std(returns) if len(returns) > 1 else 1.0
|
|
sharpe = (mean_r / std_r) * np.sqrt(252 / max(1, len(trades))) if std_r > 0 else 0
|
|
per_window_sharpe.append(round(sharpe, 3))
|
|
else:
|
|
per_window_sharpe.append(0.0)
|
|
|
|
print(f" Window {w+1}/{n_windows}: {len(trades)} trades, sharpe={per_window_sharpe[-1]}")
|
|
|
|
return compile_results(all_trades, per_window_sharpe, feature_importances_sum, fi_count, feature_cols, df)
|
|
|
|
|
|
def backtest(test_df: pd.DataFrame, proba: np.ndarray, strategy: dict) -> list:
|
|
"""Simulate trades using model predictions."""
|
|
entry_threshold = strategy.get("entry_threshold", 0.6)
|
|
stop_loss = strategy.get("stop_loss_pct", 2.0) / 100
|
|
take_profit = strategy.get("take_profit_pct", 4.0) / 100
|
|
trailing_stop = strategy.get("trailing_stop_pct", 1.5) / 100
|
|
exit_type = strategy.get("exit_type", "trailing_stop")
|
|
min_confidence = strategy.get("min_confidence_to_trade", 0.55)
|
|
fee = 0.001 # 0.1% per trade
|
|
|
|
closes = test_df["close"].values
|
|
highs = test_df["high"].values
|
|
lows = test_df["low"].values
|
|
trades = []
|
|
i = 0
|
|
|
|
while i < len(closes) - 1:
|
|
if proba[i] < min_confidence or proba[i] < entry_threshold:
|
|
i += 1
|
|
continue
|
|
|
|
# Enter trade
|
|
entry_price = closes[i]
|
|
confidence = proba[i]
|
|
# Position sizing based on confidence
|
|
if strategy.get("position_sizing") == "confidence_scaled":
|
|
if confidence > 0.8:
|
|
size_mult = 1.0
|
|
elif confidence > 0.65:
|
|
size_mult = 0.75
|
|
else:
|
|
size_mult = 0.5
|
|
else:
|
|
size_mult = 1.0
|
|
|
|
peak = entry_price
|
|
j = i + 1
|
|
|
|
while j < len(closes):
|
|
current_high = highs[j]
|
|
current_low = lows[j]
|
|
current_close = closes[j]
|
|
peak = max(peak, current_high)
|
|
|
|
# Check stop loss
|
|
if (entry_price - current_low) / entry_price >= stop_loss:
|
|
exit_price = entry_price * (1 - stop_loss)
|
|
break
|
|
# Check take profit
|
|
if (current_high - entry_price) / entry_price >= take_profit:
|
|
exit_price = entry_price * (1 + take_profit)
|
|
break
|
|
# Check trailing stop
|
|
if exit_type == "trailing_stop" and (peak - current_low) / peak >= trailing_stop:
|
|
exit_price = peak * (1 - trailing_stop)
|
|
break
|
|
|
|
j += 1
|
|
else:
|
|
# Exit at end of test period
|
|
exit_price = closes[-1]
|
|
|
|
raw_return = (exit_price - entry_price) / entry_price
|
|
net_return = raw_return - 2 * fee # entry + exit fees
|
|
net_return *= size_mult
|
|
|
|
trades.append({
|
|
"entry_idx": i,
|
|
"exit_idx": j if j < len(closes) else len(closes) - 1,
|
|
"entry_price": float(entry_price),
|
|
"exit_price": float(exit_price),
|
|
"return_pct": float(net_return * 100),
|
|
"confidence": float(confidence),
|
|
"size_mult": float(size_mult),
|
|
"duration": j - i,
|
|
})
|
|
|
|
i = j + 1 # Skip to after exit
|
|
|
|
return trades
|
|
|
|
|
|
def compile_results(trades: list, per_window_sharpe: list,
|
|
fi_sum: np.ndarray, fi_count: int,
|
|
feature_cols: list, df: pd.DataFrame) -> dict:
|
|
"""Compile all results into output JSON."""
|
|
if not trades:
|
|
return {
|
|
"sharpe_ratio": 0.0,
|
|
"total_return_pct": 0.0,
|
|
"max_drawdown_pct": 0.0,
|
|
"win_rate": 0.0,
|
|
"trade_count": 0,
|
|
"profit_factor": 0.0,
|
|
"avg_trade_duration_candles": 0.0,
|
|
"feature_importances": {},
|
|
"monthly_returns": [],
|
|
"equity_curve": [],
|
|
"per_window_sharpe": per_window_sharpe,
|
|
}
|
|
|
|
returns = [t["return_pct"] for t in trades]
|
|
wins = [r for r in returns if r > 0]
|
|
losses = [r for r in returns if r <= 0]
|
|
|
|
total_return = 1.0
|
|
equity = [1.0]
|
|
for r in returns:
|
|
total_return *= (1 + r / 100)
|
|
equity.append(total_return)
|
|
|
|
# Max drawdown
|
|
peak_eq = equity[0]
|
|
max_dd = 0
|
|
for eq in equity:
|
|
peak_eq = max(peak_eq, eq)
|
|
dd = (eq - peak_eq) / peak_eq
|
|
max_dd = min(max_dd, dd)
|
|
|
|
# Sharpe (annualized approximation)
|
|
mean_r = np.mean(returns)
|
|
std_r = np.std(returns) if len(returns) > 1 else 1.0
|
|
trades_per_year = 252 # approximate
|
|
sharpe = (mean_r / std_r) * np.sqrt(trades_per_year / max(1, len(returns))) if std_r > 0 else 0
|
|
|
|
# Profit factor
|
|
gross_profit = sum(wins) if wins else 0
|
|
gross_loss = abs(sum(losses)) if losses else 1
|
|
profit_factor = gross_profit / gross_loss if gross_loss > 0 else gross_profit
|
|
|
|
# Feature importances
|
|
fi_avg = fi_sum / max(fi_count, 1)
|
|
fi_sorted = sorted(zip(feature_cols, fi_avg), key=lambda x: -x[1])
|
|
feature_importances = {name: round(float(val), 4) for name, val in fi_sorted[:30]}
|
|
|
|
# Monthly returns (approximate by grouping trades)
|
|
monthly_returns = []
|
|
trades_per_month = max(1, len(trades) // 12)
|
|
for i in range(0, len(returns), trades_per_month):
|
|
chunk = returns[i:i + trades_per_month]
|
|
monthly_returns.append(round(sum(chunk), 2))
|
|
|
|
# Sample equity curve to ~100 points
|
|
if len(equity) > 100:
|
|
step = len(equity) // 100
|
|
equity_sampled = [round(equity[i], 4) for i in range(0, len(equity), step)]
|
|
else:
|
|
equity_sampled = [round(e, 4) for e in equity]
|
|
|
|
return {
|
|
"sharpe_ratio": round(sharpe, 3),
|
|
"total_return_pct": round((total_return - 1) * 100, 2),
|
|
"max_drawdown_pct": round(max_dd * 100, 2),
|
|
"win_rate": round(len(wins) / len(returns), 3) if returns else 0,
|
|
"trade_count": len(trades),
|
|
"profit_factor": round(profit_factor, 3),
|
|
"avg_trade_duration_candles": round(np.mean([t["duration"] for t in trades]), 1),
|
|
"feature_importances": feature_importances,
|
|
"monthly_returns": monthly_returns,
|
|
"equity_curve": equity_sampled,
|
|
"per_window_sharpe": per_window_sharpe,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="BTC ML Trading — Train & Backtest")
|
|
parser.add_argument("--config", required=True, help="Path to config JSON")
|
|
parser.add_argument("--data", required=True, help="Path to OHLCV CSV")
|
|
parser.add_argument("--output", required=True, help="Path to output results JSON")
|
|
args = parser.parse_args()
|
|
|
|
# Load config
|
|
with open(args.config) as f:
|
|
config = json.load(f)
|
|
|
|
# Load data
|
|
print(f"Loading data from {args.data}...")
|
|
df = pd.read_csv(args.data, parse_dates=["timestamp"])
|
|
print(f" {len(df)} rows, {df['timestamp'].iloc[0]} → {df['timestamp'].iloc[-1]}")
|
|
|
|
# Compute features
|
|
print("Computing features...")
|
|
df = compute_features(df, config)
|
|
|
|
# Create target
|
|
print("Creating target labels...")
|
|
df["target"] = create_target(df, config)
|
|
|
|
# Drop NaN rows
|
|
feature_cols = [c for c in df.columns if c not in ["timestamp", "open", "high", "low", "close", "volume", "target"]]
|
|
df = df.dropna(subset=feature_cols + ["target"]).reset_index(drop=True)
|
|
print(f" {len(df)} rows after dropping NaN, {len(feature_cols)} features")
|
|
print(f" Target distribution: {df['target'].value_counts().to_dict()}")
|
|
|
|
# Run walk-forward training + backtesting
|
|
print("\nRunning walk-forward validation...")
|
|
results = walk_forward_train_test(df, feature_cols, config)
|
|
|
|
# Save results
|
|
with open(args.output, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"\nResults saved to {args.output}")
|
|
print(f" Sharpe: {results['sharpe_ratio']}, Return: {results['total_return_pct']}%, "
|
|
f"Win Rate: {results['win_rate']}, Trades: {results['trade_count']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|