#!/usr/bin/env python3 """ BTC ML Trading Strategy Optimizer — Orchestrator Coordinates the optimization loop across VPS, Windows PC (GPU), and Mac Mini (LLM). """ import json import os import subprocess import sys import threading import time from datetime import datetime, timezone # Paths BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(BASE_DIR, "data") CONFIG_DIR = os.path.join(BASE_DIR, "config") RESULTS_DIR = os.path.join(BASE_DIR, "results") ITERATIONS_LOG = os.path.join(RESULTS_DIR, "iterations.jsonl") # Remote machines WINDOWS_HOST = "bizzle@100.76.218.38" WINDOWS_DIR = "btc-ml-optimizer" MAC_MINI_HOST = "bizzle@bizzles-mac-mini-1" # Convergence MAX_ITERATIONS = 50 CONVERGENCE_WINDOW = 5 CONVERGENCE_THRESHOLD = 0.01 # 1% improvement TARGET_SHARPE = 3.0 ML_TIMEOUT = 600 # 10 minutes # Colors class C: BOLD = "\033[1m" GREEN = "\033[92m" YELLOW = "\033[93m" RED = "\033[91m" CYAN = "\033[96m" MAGENTA = "\033[95m" DIM = "\033[2m" RESET = "\033[0m" def log(msg, color=""): ts = datetime.now(timezone.utc).strftime("%H:%M:%S") print(f"{C.DIM}[{ts}]{C.RESET} {color}{msg}{C.RESET}") def run_cmd(cmd, timeout=120, check=True): """Run a shell command and return stdout.""" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) if check and result.returncode != 0: raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}") return result.stdout.strip() def ensure_data(): """Make sure BTC data is fetched.""" data_4h = os.path.join(DATA_DIR, "btc_4h.csv") data_1h = os.path.join(DATA_DIR, "btc_1h.csv") if os.path.exists(data_4h) and os.path.exists(data_1h): log("Data files already exist", C.GREEN) return log("Fetching BTC data...", C.YELLOW) run_cmd(f"cd {BASE_DIR} && python3 scripts/fetch_data.py", timeout=300) def setup_windows_remote(): """Ensure the remote directory exists on Windows.""" log("Ensuring Windows remote directory exists...", C.CYAN) run_cmd(f'ssh {WINDOWS_HOST} "if not exist {WINDOWS_DIR} mkdir {WINDOWS_DIR}"', timeout=30) def scp_to_windows(local_path, remote_name): """SCP a file to the Windows PC.""" run_cmd(f"scp -q {local_path} {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name}", timeout=60) def scp_from_windows(remote_name, local_path): """SCP a file from the Windows PC.""" run_cmd(f"scp -q {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name} {local_path}", timeout=60) def run_ml_training(): """Run the ML engine on the Windows PC via SSH.""" cmd = ( f'ssh {WINDOWS_HOST} ' f'"cd {WINDOWS_DIR} && python train_and_backtest.py ' f'--config config.json --data btc_4h.csv --output results.json"' ) log("Running ML training on Windows PC (GPU)...", C.MAGENTA) result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=ML_TIMEOUT ) if result.returncode != 0: raise RuntimeError(f"ML training failed:\n{result.stderr}\n{result.stdout}") # Print training output for line in result.stdout.strip().split("\n"): log(f" {C.DIM}{line}", C.DIM) return True def load_iteration_history(): """Load iteration history from JSONL log.""" history = [] if os.path.exists(ITERATIONS_LOG): with open(ITERATIONS_LOG) as f: for line in f: line = line.strip() if line: history.append(json.loads(line)) return history def save_iteration(iteration_data): """Append an iteration to the JSONL log.""" with open(ITERATIONS_LOG, "a") as f: f.write(json.dumps(iteration_data) + "\n") def check_convergence(history): """Check if optimization has converged.""" if len(history) < CONVERGENCE_WINDOW + 1: return False, "Not enough iterations" recent = history[-CONVERGENCE_WINDOW:] sharpes = [h["sharpe"] for h in recent] # Check if best sharpe exceeds target best_sharpe = max(h["sharpe"] for h in history) if best_sharpe >= TARGET_SHARPE: return True, f"Target Sharpe reached: {best_sharpe:.3f}" # Check if improvement has stalled best_recent = max(sharpes) worst_recent = min(sharpes) if best_recent > 0 and (best_recent - worst_recent) / best_recent < CONVERGENCE_THRESHOLD: return True, f"Converged: Sharpe variance < {CONVERGENCE_THRESHOLD*100}% over {CONVERGENCE_WINDOW} iterations" return False, "" def print_header(): print(f""" {C.BOLD}{C.CYAN}╔══════════════════════════════════════════════════╗ ║ BTC ML Trading Strategy Optimizer ║ ║ VPS → Windows GPU → Mac Mini LLM → Loop ║ ╚══════════════════════════════════════════════════╝{C.RESET} """) def print_results(results, iteration): sharpe = results.get("sharpe_ratio", 0) sharpe_color = C.GREEN if sharpe > 1.5 else C.YELLOW if sharpe > 1.0 else C.RED print(f""" {C.BOLD}━━━ Iteration {iteration} Results ━━━{C.RESET} Sharpe Ratio: {sharpe_color}{C.BOLD}{sharpe:.3f}{C.RESET} Total Return: {results.get('total_return_pct', 0):.1f}% Max Drawdown: {results.get('max_drawdown_pct', 0):.1f}% Win Rate: {results.get('win_rate', 0):.1%} Trade Count: {results.get('trade_count', 0)} Profit Factor: {results.get('profit_factor', 0):.3f} Avg Duration: {results.get('avg_trade_duration_candles', 0):.1f} candles Window Sharpes: {results.get('per_window_sharpe', [])} """) def main(): print_header() os.makedirs(RESULTS_DIR, exist_ok=True) # Step 1: Ensure data ensure_data() # Step 2: Load or create initial config config_path = os.path.join(CONFIG_DIR, "initial_config.json") best_config_path = os.path.join(CONFIG_DIR, "best_config.json") # Resume from best config if it exists if os.path.exists(best_config_path): log("Resuming from best_config.json", C.GREEN) with open(best_config_path) as f: config = json.load(f) else: with open(config_path) as f: config = json.load(f) history = load_iteration_history() start_iter = len(history) + 1 best_sharpe = max((h["sharpe"] for h in history), default=0) log(f"Starting at iteration {start_iter}, best Sharpe so far: {best_sharpe:.3f}", C.BOLD) # Step 3: Setup Windows remote setup_windows_remote() # SCP the ML engine script (once) log("Uploading ML engine to Windows...", C.CYAN) scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py") # SCP data files (once) for tf in ["1h", "4h"]: data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv") if os.path.exists(data_file): log(f"Uploading btc_{tf}.csv to Windows...", C.CYAN) scp_to_windows(data_file, f"btc_{tf}.csv") # Import LLM analyzer sys.path.insert(0, os.path.join(BASE_DIR, "llm_client")) from analyzer import analyze_and_suggest # Main optimization loop for iteration in range(start_iter, MAX_ITERATIONS + 1): log(f"\n{'='*50}", C.BOLD) log(f"ITERATION {iteration}/{MAX_ITERATIONS}", f"{C.BOLD}{C.CYAN}") log(f"Model: {config.get('model_type', 'unknown')}, " f"LR: {config.get('hyperparameters', {}).get('learning_rate', '?')}, " f"Depth: {config.get('hyperparameters', {}).get('max_depth', '?')}", C.DIM) log(f"{'='*50}", C.BOLD) # Write current config to temp file and SCP tmp_config = os.path.join(BASE_DIR, "config", "current_config.json") with open(tmp_config, "w") as f: json.dump(config, f, indent=2) scp_to_windows(tmp_config, "config.json") # Run ML training on Windows try: run_ml_training() except (RuntimeError, subprocess.TimeoutExpired) as e: log(f"ML training failed: {e}", C.RED) log("Reverting to previous config and continuing...", C.YELLOW) if history: config = history[-1].get("config", config) continue # Fetch results from Windows results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json") scp_from_windows("results.json", results_local) with open(results_local) as f: results = json.load(f) print_results(results, iteration) # Track best current_sharpe = results.get("sharpe_ratio", 0) is_best = current_sharpe > best_sharpe if is_best: best_sharpe = current_sharpe with open(best_config_path, "w") as f: json.dump(config, f, indent=2) log(f"NEW BEST! Sharpe: {best_sharpe:.3f}", f"{C.BOLD}{C.GREEN}") # Log iteration iter_data = { "iteration": iteration, "timestamp": datetime.now(timezone.utc).isoformat(), "sharpe": current_sharpe, "return": results.get("total_return_pct", 0), "max_drawdown": results.get("max_drawdown_pct", 0), "win_rate": results.get("win_rate", 0), "trades": results.get("trade_count", 0), "profit_factor": results.get("profit_factor", 0), "model_type": config.get("model_type", "unknown"), "is_best": is_best, "config": config, } save_iteration(iter_data) history.append(iter_data) # Check convergence converged, reason = check_convergence(history) if converged: log(f"\nOptimization converged: {reason}", f"{C.BOLD}{C.GREEN}") break if iteration >= MAX_ITERATIONS: log(f"\nMax iterations ({MAX_ITERATIONS}) reached.", C.YELLOW) break # Ask LLM for next config log("\nConsulting LLM for strategy modifications...", C.MAGENTA) try: summary_history = [ { "iteration": h["iteration"], "sharpe": h["sharpe"], "return": h["return"], "win_rate": h["win_rate"], "trades": h["trades"], "model_type": h["model_type"], } for h in history ] new_config, reasoning = analyze_and_suggest(config, results, summary_history) log(f" LLM reasoning: {reasoning[:200]}...", C.DIM) config = new_config except Exception as e: log(f"LLM call failed: {e}", C.RED) log("Continuing with current config + random perturbation...", C.YELLOW) # Small random perturbation as fallback import random hp = config.get("hyperparameters", {}) hp["learning_rate"] = hp.get("learning_rate", 0.05) * random.uniform(0.8, 1.2) hp["max_depth"] = max(3, min(10, hp.get("max_depth", 6) + random.choice([-1, 0, 1]))) config["hyperparameters"] = hp # Final summary print(f""" {C.BOLD}{C.GREEN}╔══════════════════════════════════════════════════╗ ║ Optimization Complete! ║ ╚══════════════════════════════════════════════════╝{C.RESET} Total Iterations: {len(history)} Best Sharpe: {C.BOLD}{best_sharpe:.3f}{C.RESET} Best Config: {best_config_path} Iteration Log: {ITERATIONS_LOG} """) # --- Library API for dashboard integration --- # Shared state for dashboard _stop_event = threading.Event() _status = { "state": "idle", # idle, running, completed, error "iteration": 0, "max_iterations": MAX_ITERATIONS, "best_sharpe": 0.0, "error": None, "llm_suggestions": [], # list of {iteration, reasoning, changes} } _status_lock = threading.Lock() def get_status(): """Get current optimization status (thread-safe).""" with _status_lock: return dict(_status) def update_status(**kwargs): """Update status fields (thread-safe).""" with _status_lock: _status.update(kwargs) def run_optimization_loop(callback=None, config_override=None): """ Run the optimization loop. Designed to be called from a background thread. Args: callback: Called after each iteration with (iteration_number, iter_data_dict). config_override: Optional dict to use instead of loading from disk. """ _stop_event.clear() update_status(state="running", iteration=0, error=None, best_sharpe=0.0) try: os.makedirs(RESULTS_DIR, exist_ok=True) ensure_data() config_path = os.path.join(CONFIG_DIR, "initial_config.json") best_config_path = os.path.join(CONFIG_DIR, "best_config.json") if config_override: config = config_override elif os.path.exists(best_config_path): with open(best_config_path) as f: config = json.load(f) else: with open(config_path) as f: config = json.load(f) history = load_iteration_history() start_iter = len(history) + 1 best_sharpe = max((h["sharpe"] for h in history), default=0) update_status(best_sharpe=best_sharpe) setup_windows_remote() scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py") for tf in ["1h", "4h"]: data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv") if os.path.exists(data_file): scp_to_windows(data_file, f"btc_{tf}.csv") sys.path.insert(0, os.path.join(BASE_DIR, "llm_client")) from analyzer import analyze_and_suggest for iteration in range(start_iter, MAX_ITERATIONS + 1): if _stop_event.is_set(): update_status(state="completed") return update_status(iteration=iteration) tmp_config = os.path.join(BASE_DIR, "config", "current_config.json") with open(tmp_config, "w") as f: json.dump(config, f, indent=2) scp_to_windows(tmp_config, "config.json") try: run_ml_training() except (RuntimeError, subprocess.TimeoutExpired) as e: if callback: callback(iteration, {"error": str(e)}) if history: config = history[-1].get("config", config) continue results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json") scp_from_windows("results.json", results_local) with open(results_local) as f: results = json.load(f) current_sharpe = results.get("sharpe_ratio", 0) is_best = current_sharpe > best_sharpe if is_best: best_sharpe = current_sharpe with open(best_config_path, "w") as f: json.dump(config, f, indent=2) update_status(best_sharpe=best_sharpe) iter_data = { "iteration": iteration, "timestamp": datetime.now(timezone.utc).isoformat(), "sharpe": current_sharpe, "return": results.get("total_return_pct", 0), "max_drawdown": results.get("max_drawdown_pct", 0), "win_rate": results.get("win_rate", 0), "trades": results.get("trade_count", 0), "profit_factor": results.get("profit_factor", 0), "model_type": config.get("model_type", "unknown"), "is_best": is_best, "config": config, "results": results, } save_iteration(iter_data) history.append(iter_data) if callback: callback(iteration, iter_data) converged, reason = check_convergence(history) if converged: update_status(state="completed") return if iteration >= MAX_ITERATIONS: update_status(state="completed") return if _stop_event.is_set(): update_status(state="completed") return # LLM suggestion try: summary_history = [ {k: h[k] for k in ("iteration", "sharpe", "return", "win_rate", "trades", "model_type")} for h in history ] new_config, reasoning = analyze_and_suggest(config, results, summary_history) with _status_lock: _status["llm_suggestions"].append({ "iteration": iteration, "reasoning": reasoning, }) config = new_config except Exception: import random hp = config.get("hyperparameters", {}) hp["learning_rate"] = hp.get("learning_rate", 0.05) * random.uniform(0.8, 1.2) hp["max_depth"] = max(3, min(10, hp.get("max_depth", 6) + random.choice([-1, 0, 1]))) config["hyperparameters"] = hp update_status(state="completed") except Exception as e: update_status(state="error", error=str(e)) raise def request_stop(): """Request graceful stop of the optimization loop.""" _stop_event.set() if __name__ == "__main__": main()