btc-accumulation-monitor/orchestrator.py
BizzleBot 8ff35c1a86 feat: complete BTC ML trading strategy optimizer
Multi-machine optimization loop:
- VPS orchestrator coordinates training and LLM analysis
- Windows PC (RTX 4070 Ti) runs XGBoost/LightGBM/CatBoost with GPU
- Mac Mini runs qwen3.5:27b via Ollama for strategy analysis

Includes 60+ technical features, walk-forward validation,
confidence-scaled position sizing, and automated convergence detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:25:44 +00:00

328 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BTC ML Trading Strategy Optimizer — Orchestrator
Coordinates the optimization loop across VPS, Windows PC (GPU), and Mac Mini (LLM).
"""
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
# Paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
RESULTS_DIR = os.path.join(BASE_DIR, "results")
ITERATIONS_LOG = os.path.join(RESULTS_DIR, "iterations.jsonl")
# Remote machines
WINDOWS_HOST = "bizzle@100.76.218.38"
WINDOWS_DIR = "~/btc-ml-optimizer"
MAC_MINI_HOST = "bizzle@bizzles-mac-mini-1"
# Convergence
MAX_ITERATIONS = 50
CONVERGENCE_WINDOW = 5
CONVERGENCE_THRESHOLD = 0.01 # 1% improvement
TARGET_SHARPE = 3.0
ML_TIMEOUT = 600 # 10 minutes
# Colors
class C:
BOLD = "\033[1m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
MAGENTA = "\033[95m"
DIM = "\033[2m"
RESET = "\033[0m"
def log(msg, color=""):
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
print(f"{C.DIM}[{ts}]{C.RESET} {color}{msg}{C.RESET}")
def run_cmd(cmd, timeout=120, check=True):
"""Run a shell command and return stdout."""
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout
)
if check and result.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}")
return result.stdout.strip()
def ensure_data():
"""Make sure BTC data is fetched."""
data_4h = os.path.join(DATA_DIR, "btc_4h.csv")
data_1h = os.path.join(DATA_DIR, "btc_1h.csv")
if os.path.exists(data_4h) and os.path.exists(data_1h):
log("Data files already exist", C.GREEN)
return
log("Fetching BTC data...", C.YELLOW)
run_cmd(f"cd {BASE_DIR} && python3 scripts/fetch_data.py", timeout=300)
def setup_windows_remote():
"""Ensure the remote directory exists on Windows."""
log("Ensuring Windows remote directory exists...", C.CYAN)
run_cmd(f'ssh {WINDOWS_HOST} "mkdir -p {WINDOWS_DIR}"', timeout=30)
def scp_to_windows(local_path, remote_name):
"""SCP a file to the Windows PC."""
run_cmd(f"scp -q {local_path} {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name}", timeout=60)
def scp_from_windows(remote_name, local_path):
"""SCP a file from the Windows PC."""
run_cmd(f"scp -q {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name} {local_path}", timeout=60)
def run_ml_training():
"""Run the ML engine on the Windows PC via SSH."""
cmd = (
f'ssh {WINDOWS_HOST} '
f'"cd {WINDOWS_DIR} && python train_and_backtest.py '
f'--config config.json --data btc_4h.csv --output results.json"'
)
log("Running ML training on Windows PC (GPU)...", C.MAGENTA)
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=ML_TIMEOUT
)
if result.returncode != 0:
raise RuntimeError(f"ML training failed:\n{result.stderr}\n{result.stdout}")
# Print training output
for line in result.stdout.strip().split("\n"):
log(f" {C.DIM}{line}", C.DIM)
return True
def load_iteration_history():
"""Load iteration history from JSONL log."""
history = []
if os.path.exists(ITERATIONS_LOG):
with open(ITERATIONS_LOG) as f:
for line in f:
line = line.strip()
if line:
history.append(json.loads(line))
return history
def save_iteration(iteration_data):
"""Append an iteration to the JSONL log."""
with open(ITERATIONS_LOG, "a") as f:
f.write(json.dumps(iteration_data) + "\n")
def check_convergence(history):
"""Check if optimization has converged."""
if len(history) < CONVERGENCE_WINDOW + 1:
return False, "Not enough iterations"
recent = history[-CONVERGENCE_WINDOW:]
sharpes = [h["sharpe"] for h in recent]
# Check if best sharpe exceeds target
best_sharpe = max(h["sharpe"] for h in history)
if best_sharpe >= TARGET_SHARPE:
return True, f"Target Sharpe reached: {best_sharpe:.3f}"
# Check if improvement has stalled
best_recent = max(sharpes)
worst_recent = min(sharpes)
if best_recent > 0 and (best_recent - worst_recent) / best_recent < CONVERGENCE_THRESHOLD:
return True, f"Converged: Sharpe variance < {CONVERGENCE_THRESHOLD*100}% over {CONVERGENCE_WINDOW} iterations"
return False, ""
def print_header():
print(f"""
{C.BOLD}{C.CYAN}╔══════════════════════════════════════════════════╗
║ BTC ML Trading Strategy Optimizer ║
║ VPS → Windows GPU → Mac Mini LLM → Loop ║
╚══════════════════════════════════════════════════╝{C.RESET}
""")
def print_results(results, iteration):
sharpe = results.get("sharpe_ratio", 0)
sharpe_color = C.GREEN if sharpe > 1.5 else C.YELLOW if sharpe > 1.0 else C.RED
print(f"""
{C.BOLD}━━━ Iteration {iteration} Results ━━━{C.RESET}
Sharpe Ratio: {sharpe_color}{C.BOLD}{sharpe:.3f}{C.RESET}
Total Return: {results.get('total_return_pct', 0):.1f}%
Max Drawdown: {results.get('max_drawdown_pct', 0):.1f}%
Win Rate: {results.get('win_rate', 0):.1%}
Trade Count: {results.get('trade_count', 0)}
Profit Factor: {results.get('profit_factor', 0):.3f}
Avg Duration: {results.get('avg_trade_duration_candles', 0):.1f} candles
Window Sharpes: {results.get('per_window_sharpe', [])}
""")
def main():
print_header()
os.makedirs(RESULTS_DIR, exist_ok=True)
# Step 1: Ensure data
ensure_data()
# Step 2: Load or create initial config
config_path = os.path.join(CONFIG_DIR, "initial_config.json")
best_config_path = os.path.join(CONFIG_DIR, "best_config.json")
# Resume from best config if it exists
if os.path.exists(best_config_path):
log("Resuming from best_config.json", C.GREEN)
with open(best_config_path) as f:
config = json.load(f)
else:
with open(config_path) as f:
config = json.load(f)
history = load_iteration_history()
start_iter = len(history) + 1
best_sharpe = max((h["sharpe"] for h in history), default=0)
log(f"Starting at iteration {start_iter}, best Sharpe so far: {best_sharpe:.3f}", C.BOLD)
# Step 3: Setup Windows remote
setup_windows_remote()
# SCP the ML engine script (once)
log("Uploading ML engine to Windows...", C.CYAN)
scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py")
# SCP data files (once)
for tf in ["1h", "4h"]:
data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv")
if os.path.exists(data_file):
log(f"Uploading btc_{tf}.csv to Windows...", C.CYAN)
scp_to_windows(data_file, f"btc_{tf}.csv")
# Import LLM analyzer
sys.path.insert(0, os.path.join(BASE_DIR, "llm_client"))
from analyzer import analyze_and_suggest
# Main optimization loop
for iteration in range(start_iter, MAX_ITERATIONS + 1):
log(f"\n{'='*50}", C.BOLD)
log(f"ITERATION {iteration}/{MAX_ITERATIONS}", f"{C.BOLD}{C.CYAN}")
log(f"Model: {config.get('model_type', 'unknown')}, "
f"LR: {config.get('hyperparameters', {}).get('learning_rate', '?')}, "
f"Depth: {config.get('hyperparameters', {}).get('max_depth', '?')}", C.DIM)
log(f"{'='*50}", C.BOLD)
# Write current config to temp file and SCP
tmp_config = os.path.join(BASE_DIR, "config", "current_config.json")
with open(tmp_config, "w") as f:
json.dump(config, f, indent=2)
scp_to_windows(tmp_config, "config.json")
# Run ML training on Windows
try:
run_ml_training()
except (RuntimeError, subprocess.TimeoutExpired) as e:
log(f"ML training failed: {e}", C.RED)
log("Reverting to previous config and continuing...", C.YELLOW)
if history:
config = history[-1].get("config", config)
continue
# Fetch results from Windows
results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json")
scp_from_windows("results.json", results_local)
with open(results_local) as f:
results = json.load(f)
print_results(results, iteration)
# Track best
current_sharpe = results.get("sharpe_ratio", 0)
is_best = current_sharpe > best_sharpe
if is_best:
best_sharpe = current_sharpe
with open(best_config_path, "w") as f:
json.dump(config, f, indent=2)
log(f"NEW BEST! Sharpe: {best_sharpe:.3f}", f"{C.BOLD}{C.GREEN}")
# Log iteration
iter_data = {
"iteration": iteration,
"timestamp": datetime.now(timezone.utc).isoformat(),
"sharpe": current_sharpe,
"return": results.get("total_return_pct", 0),
"max_drawdown": results.get("max_drawdown_pct", 0),
"win_rate": results.get("win_rate", 0),
"trades": results.get("trade_count", 0),
"profit_factor": results.get("profit_factor", 0),
"model_type": config.get("model_type", "unknown"),
"is_best": is_best,
"config": config,
}
save_iteration(iter_data)
history.append(iter_data)
# Check convergence
converged, reason = check_convergence(history)
if converged:
log(f"\nOptimization converged: {reason}", f"{C.BOLD}{C.GREEN}")
break
if iteration >= MAX_ITERATIONS:
log(f"\nMax iterations ({MAX_ITERATIONS}) reached.", C.YELLOW)
break
# Ask LLM for next config
log("\nConsulting LLM for strategy modifications...", C.MAGENTA)
try:
summary_history = [
{
"iteration": h["iteration"],
"sharpe": h["sharpe"],
"return": h["return"],
"win_rate": h["win_rate"],
"trades": h["trades"],
"model_type": h["model_type"],
}
for h in history
]
new_config, reasoning = analyze_and_suggest(config, results, summary_history)
log(f" LLM reasoning: {reasoning[:200]}...", C.DIM)
config = new_config
except Exception as e:
log(f"LLM call failed: {e}", C.RED)
log("Continuing with current config + random perturbation...", C.YELLOW)
# Small random perturbation as fallback
import random
hp = config.get("hyperparameters", {})
hp["learning_rate"] = hp.get("learning_rate", 0.05) * random.uniform(0.8, 1.2)
hp["max_depth"] = max(3, min(10, hp.get("max_depth", 6) + random.choice([-1, 0, 1])))
config["hyperparameters"] = hp
# Final summary
print(f"""
{C.BOLD}{C.GREEN}╔══════════════════════════════════════════════════╗
║ Optimization Complete! ║
╚══════════════════════════════════════════════════╝{C.RESET}
Total Iterations: {len(history)}
Best Sharpe: {C.BOLD}{best_sharpe:.3f}{C.RESET}
Best Config: {best_config_path}
Iteration Log: {ITERATIONS_LOG}
""")
if __name__ == "__main__":
main()