btc-accumulation-monitor/orchestrator.py
BizzleBot aba30f7718 fix: LLM analysis + new run button + settings page support
- Fixed LLM failing silently (401 auth error on every iteration)
- Reset provider to Ollama (working) from broken OpenRouter config
- Added /api/clear endpoint + 'New Run' button to reset history
- LLM failures now logged visibly with error details
- LLM suggestions persisted to iteration data (survive restarts)
- Settings page support via llm_settings.json (multi-provider)
2026-03-20 21:51:05 +00:00

499 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BTC Accumulation Signal Optimizer -- Orchestrator
Coordinates the optimization loop across VPS, Windows PC (GPU), and Mac Mini (LLM).
"""
import json
import os
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
# Paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
RESULTS_DIR = os.path.join(BASE_DIR, "results")
ITERATIONS_LOG = os.path.join(RESULTS_DIR, "iterations.jsonl")
# Remote machines
WINDOWS_HOST = "bizzle@100.76.218.38"
WINDOWS_DIR = "btc-ml-optimizer"
MAC_MINI_HOST = "bizzle@bizzles-mac-mini-1"
# Convergence
MAX_ITERATIONS = 50
CONVERGENCE_WINDOW = 5
CONVERGENCE_THRESHOLD = 0.01 # 1% improvement
TARGET_COST_IMPROVEMENT = 20.0 # 20% cost basis improvement = exceptional
MIN_SIGNAL_COUNT = 30 # Minimum strong buy signals for valid results
ML_TIMEOUT = 600 # 10 minutes
# Colors
class C:
BOLD = "\033[1m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
MAGENTA = "\033[95m"
DIM = "\033[2m"
RESET = "\033[0m"
def log(msg, color=""):
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
print(f"{C.DIM}[{ts}]{C.RESET} {color}{msg}{C.RESET}")
def run_cmd(cmd, timeout=120, check=True):
"""Run a shell command and return stdout."""
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout
)
if check and result.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}")
return result.stdout.strip()
def ensure_data():
"""Make sure BTC data is fetched."""
data_4h = os.path.join(DATA_DIR, "btc_4h.csv")
data_1h = os.path.join(DATA_DIR, "btc_1h.csv")
if os.path.exists(data_4h) and os.path.exists(data_1h):
log("Data files already exist", C.GREEN)
return
log("Fetching BTC data...", C.YELLOW)
run_cmd(f"cd {BASE_DIR} && python3 scripts/fetch_data.py", timeout=300)
def setup_windows_remote():
"""Ensure the remote directory exists on Windows."""
log("Ensuring Windows remote directory exists...", C.CYAN)
run_cmd(f'ssh {WINDOWS_HOST} "if not exist {WINDOWS_DIR} mkdir {WINDOWS_DIR}"', timeout=30)
def scp_to_windows(local_path, remote_name):
"""SCP a file to the Windows PC."""
run_cmd(f"scp -q {local_path} {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name}", timeout=60)
def scp_from_windows(remote_name, local_path):
"""SCP a file from the Windows PC."""
run_cmd(f"scp -q {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name} {local_path}", timeout=60)
def run_ml_training():
"""Run the ML engine on the Windows PC via SSH."""
cmd = (
f'ssh {WINDOWS_HOST} '
f'"cd {WINDOWS_DIR} && python train_and_backtest.py '
f'--config config.json --data btc_4h.csv --output results.json"'
)
log("Running ML training on Windows PC (GPU)...", C.MAGENTA)
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=ML_TIMEOUT
)
if result.returncode != 0:
raise RuntimeError(f"ML training failed:\n{result.stderr}\n{result.stdout}")
for line in result.stdout.strip().split("\n"):
log(f" {C.DIM}{line}", C.DIM)
return True
def load_iteration_history():
"""Load iteration history from JSONL log."""
history = []
if os.path.exists(ITERATIONS_LOG):
with open(ITERATIONS_LOG) as f:
for line in f:
line = line.strip()
if line:
history.append(json.loads(line))
return history
def save_iteration(iteration_data):
"""Append an iteration to the JSONL log."""
with open(ITERATIONS_LOG, "a") as f:
f.write(json.dumps(iteration_data) + "\n")
def check_convergence(history):
"""Check if optimization has converged."""
if len(history) < CONVERGENCE_WINDOW + 1:
return False, "Not enough iterations"
# Only consider valid results (enough signals)
valid = [h for h in history if h.get("signal_count", 0) >= MIN_SIGNAL_COUNT]
if not valid:
return False, "No valid results yet"
recent = history[-CONVERGENCE_WINDOW:]
scores = [h.get("cost_improvement", 0) for h in recent]
# Check if best score exceeds target
best_score = max(h.get("cost_improvement", 0) for h in valid)
if best_score >= TARGET_COST_IMPROVEMENT:
return True, f"Target cost improvement reached: {best_score:.1f}%"
# Check if improvement has stalled
best_recent = max(scores)
worst_recent = min(scores)
if best_recent > 0 and (best_recent - worst_recent) / best_recent < CONVERGENCE_THRESHOLD:
return True, f"Converged: variance < {CONVERGENCE_THRESHOLD*100}% over {CONVERGENCE_WINDOW} iterations"
return False, ""
def print_header():
print(f"""
{C.BOLD}{C.CYAN}========================================================
BTC Accumulation Signal Optimizer
VPS -> Windows GPU -> Mac Mini LLM -> Loop
========================================================{C.RESET}
""")
def print_results(results, iteration):
cost_imp = results.get("cost_basis_improvement_pct", 0)
color = C.GREEN if cost_imp > 15 else C.YELLOW if cost_imp > 10 else C.RED
print(f"""
{C.BOLD}--- Iteration {iteration} Results ---{C.RESET}
Cost Improvement: {color}{C.BOLD}{cost_imp:.1f}%{C.RESET}
Avg Cost (Model): ${results.get('avg_cost_basis_model', 0):,.2f}
Avg Cost (DCA): ${results.get('avg_cost_basis_dca', 0):,.2f}
Strong Signals: {results.get('strong_buy_signal_count', 0)}
Signal Frequency: {results.get('signal_frequency_pct', 0):.1f}%
Quality Score: {results.get('pct_quality_strong_buy', 0):.1%}
Model R2: {results.get('model_r2_score', 0):.4f}
Score@Bottoms: {results.get('avg_score_at_actual_bottoms', 0):.1f}
Score@Tops: {results.get('avg_score_at_actual_tops', 0):.1f}
Window Improvements: {results.get('per_window_cost_improvement', [])}
""")
def main():
print_header()
os.makedirs(RESULTS_DIR, exist_ok=True)
ensure_data()
config_path = os.path.join(CONFIG_DIR, "initial_config.json")
best_config_path = os.path.join(CONFIG_DIR, "best_config.json")
if os.path.exists(best_config_path):
log("Resuming from best_config.json", C.GREEN)
with open(best_config_path) as f:
config = json.load(f)
else:
with open(config_path) as f:
config = json.load(f)
history = load_iteration_history()
start_iter = len(history) + 1
best_score = max((h.get("cost_improvement", 0) for h in history), default=0)
log(f"Starting at iteration {start_iter}, best cost improvement so far: {best_score:.1f}%", C.BOLD)
setup_windows_remote()
log("Uploading ML engine to Windows...", C.CYAN)
scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py")
for tf in ["1h", "4h"]:
data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv")
if os.path.exists(data_file):
log(f"Uploading btc_{tf}.csv to Windows...", C.CYAN)
scp_to_windows(data_file, f"btc_{tf}.csv")
sys.path.insert(0, os.path.join(BASE_DIR, "llm_client"))
from analyzer import analyze_and_suggest
for iteration in range(start_iter, MAX_ITERATIONS + 1):
log(f"\n{'='*50}", C.BOLD)
log(f"ITERATION {iteration}/{MAX_ITERATIONS}", f"{C.BOLD}{C.CYAN}")
log(f"Model: {config.get('model_type', 'unknown')}, "
f"LR: {config.get('hyperparameters', {}).get('learning_rate', '?')}, "
f"Depth: {config.get('hyperparameters', {}).get('max_depth', '?')}", C.DIM)
log(f"{'='*50}", C.BOLD)
tmp_config = os.path.join(BASE_DIR, "config", "current_config.json")
with open(tmp_config, "w") as f:
json.dump(config, f, indent=2)
scp_to_windows(tmp_config, "config.json")
try:
run_ml_training()
except (RuntimeError, subprocess.TimeoutExpired) as e:
log(f"ML training failed: {e}", C.RED)
log("Reverting to previous config and continuing...", C.YELLOW)
if history:
config = history[-1].get("config", config)
continue
results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json")
scp_from_windows("results.json", results_local)
with open(results_local) as f:
results = json.load(f)
print_results(results, iteration)
current_score = results.get("cost_basis_improvement_pct", 0)
signal_count = results.get("strong_buy_signal_count", 0)
is_best = current_score > best_score and signal_count >= MIN_SIGNAL_COUNT
if is_best:
best_score = current_score
with open(best_config_path, "w") as f:
json.dump(config, f, indent=2)
log(f"NEW BEST! Cost Improvement: {best_score:.1f}%", f"{C.BOLD}{C.GREEN}")
iter_data = {
"iteration": iteration,
"timestamp": datetime.now(timezone.utc).isoformat(),
"cost_improvement": current_score,
"avg_30d_return": results.get("avg_quality_score_strong_buy", 0),
"avg_90d_return": results.get("pct_quality_strong_buy", 0),
"signal_count": signal_count,
"signal_frequency": results.get("signal_frequency_pct", 0),
"r2_score": results.get("model_r2_score", 0),
"score_at_bottoms": results.get("avg_score_at_actual_bottoms", 0),
"score_at_tops": results.get("avg_score_at_actual_tops", 0),
"model_type": config.get("model_type", "unknown"),
"is_best": is_best,
"config": config,
"results": results,
}
save_iteration(iter_data)
history.append(iter_data)
converged, reason = check_convergence(history)
if converged:
log(f"\nOptimization converged: {reason}", f"{C.BOLD}{C.GREEN}")
break
if iteration >= MAX_ITERATIONS:
log(f"\nMax iterations ({MAX_ITERATIONS}) reached.", C.YELLOW)
break
log("\nConsulting LLM for strategy modifications...", C.MAGENTA)
try:
summary_history = [
{
"iteration": h["iteration"],
"cost_improvement": h.get("cost_improvement", 0),
"signal_count": h.get("signal_count", 0),
"r2_score": h.get("r2_score", 0),
"model_type": h.get("model_type", "unknown"),
}
for h in history
]
new_config, reasoning = analyze_and_suggest(config, results, summary_history)
log(f" LLM reasoning: {reasoning[:200]}...", C.DIM)
config = new_config
except Exception as e:
log(f"LLM call failed: {e}", C.RED)
log("Continuing with current config + random perturbation...", C.YELLOW)
import random
hp = config.get("hyperparameters", {})
hp["learning_rate"] = hp.get("learning_rate", 0.01) * random.uniform(0.8, 1.2)
hp["max_depth"] = max(3, min(10, hp.get("max_depth", 5) + random.choice([-1, 0, 1])))
config["hyperparameters"] = hp
print(f"""
{C.BOLD}{C.GREEN}========================================================
Optimization Complete!
========================================================{C.RESET}
Total Iterations: {len(history)}
Best Cost Improvement: {C.BOLD}{best_score:.1f}%{C.RESET}
Best Config: {best_config_path}
Iteration Log: {ITERATIONS_LOG}
""")
# --- Library API for dashboard integration ---
_stop_event = threading.Event()
_status = {
"state": "idle",
"iteration": 0,
"max_iterations": MAX_ITERATIONS,
"best_score": 0.0,
"error": None,
"llm_suggestions": [],
}
_status_lock = threading.Lock()
def get_status():
"""Get current optimization status (thread-safe)."""
with _status_lock:
return dict(_status)
def update_status(**kwargs):
"""Update status fields (thread-safe)."""
with _status_lock:
_status.update(kwargs)
def run_optimization_loop(callback=None, config_override=None):
"""Run the optimization loop from a background thread."""
_stop_event.clear()
update_status(state="running", iteration=0, error=None, best_score=0.0)
try:
os.makedirs(RESULTS_DIR, exist_ok=True)
ensure_data()
config_path = os.path.join(CONFIG_DIR, "initial_config.json")
best_config_path = os.path.join(CONFIG_DIR, "best_config.json")
if config_override:
config = config_override
elif os.path.exists(best_config_path):
with open(best_config_path) as f:
config = json.load(f)
else:
with open(config_path) as f:
config = json.load(f)
history = load_iteration_history()
start_iter = len(history) + 1
best_score = max((h.get("cost_improvement", 0) for h in history), default=0)
update_status(best_score=best_score)
setup_windows_remote()
scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py")
for tf in ["1h", "4h"]:
data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv")
if os.path.exists(data_file):
scp_to_windows(data_file, f"btc_{tf}.csv")
sys.path.insert(0, os.path.join(BASE_DIR, "llm_client"))
from analyzer import analyze_and_suggest
for iteration in range(start_iter, MAX_ITERATIONS + 1):
if _stop_event.is_set():
update_status(state="completed")
return
update_status(iteration=iteration)
tmp_config = os.path.join(BASE_DIR, "config", "current_config.json")
with open(tmp_config, "w") as f:
json.dump(config, f, indent=2)
scp_to_windows(tmp_config, "config.json")
try:
run_ml_training()
except (RuntimeError, subprocess.TimeoutExpired) as e:
if callback:
callback(iteration, {"error": str(e)})
if history:
config = history[-1].get("config", config)
continue
results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json")
scp_from_windows("results.json", results_local)
with open(results_local) as f:
results = json.load(f)
current_score = results.get("cost_basis_improvement_pct", 0)
signal_count = results.get("strong_buy_signal_count", 0)
is_best = current_score > best_score and signal_count >= MIN_SIGNAL_COUNT
if is_best:
best_score = current_score
with open(best_config_path, "w") as f:
json.dump(config, f, indent=2)
update_status(best_score=best_score)
iter_data = {
"iteration": iteration,
"timestamp": datetime.now(timezone.utc).isoformat(),
"cost_improvement": current_score,
"signal_count": signal_count,
"signal_frequency": results.get("signal_frequency_pct", 0),
"r2_score": results.get("model_r2_score", 0),
"score_at_bottoms": results.get("avg_score_at_actual_bottoms", 0),
"score_at_tops": results.get("avg_score_at_actual_tops", 0),
"quality": results.get("pct_quality_strong_buy", 0),
"model_type": config.get("model_type", "unknown"),
"is_best": is_best,
"config": config,
"results": results,
}
save_iteration(iter_data)
history.append(iter_data)
if callback:
callback(iteration, iter_data)
converged, reason = check_convergence(history)
if converged:
update_status(state="completed")
return
if iteration >= MAX_ITERATIONS:
update_status(state="completed")
return
if _stop_event.is_set():
update_status(state="completed")
return
try:
summary_history = [
{k: h[k] for k in ("iteration", "cost_improvement", "signal_count", "r2_score", "model_type")
if k in h}
for h in history
]
new_config, reasoning = analyze_and_suggest(config, results, summary_history)
with _status_lock:
_status["llm_suggestions"].append({
"iteration": iteration,
"reasoning": reasoning,
})
# Also persist LLM suggestion to iteration log
iter_data["llm_reasoning"] = reasoning
iter_data["llm_applied"] = True
config = new_config
except Exception as e:
import random, traceback
err_msg = f"LLM call failed: {type(e).__name__}: {e}"
print(f" WARNING: {err_msg}")
traceback.print_exc()
with _status_lock:
_status["llm_suggestions"].append({
"iteration": iteration,
"reasoning": f"ERROR: {err_msg} — using random perturbation",
})
iter_data["llm_reasoning"] = err_msg
iter_data["llm_applied"] = False
hp = config.get("hyperparameters", {})
hp["learning_rate"] = hp.get("learning_rate", 0.01) * random.uniform(0.8, 1.2)
hp["max_depth"] = max(3, min(10, hp.get("max_depth", 5) + random.choice([-1, 0, 1])))
config["hyperparameters"] = hp
update_status(state="completed")
except Exception as e:
update_status(state="error", error=str(e))
raise
def request_stop():
"""Request graceful stop of the optimization loop."""
_stop_event.set()
if __name__ == "__main__":
main()