Replace day-trading bot with long-term accumulation signal model. Predicts optimal BUY times using forward return analysis at 7d/30d/90d horizons, scoring each candle 0-100. Primary metric is now cost_basis_improvement_pct (model buy price vs DCA). - train_and_backtest.py: regression models (XGBoost/LSTM hybrid), accumulation-focused features (price position, momentum, volatility, volume, cycle), forward return targets, signal quality backtesting - orchestrator.py: cost improvement scoring, signal count validation - analyzer.py: accumulation-focused LLM system prompt - dashboard: cost improvement display, signal metrics table - config: new accumulation-focused parameters Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
486 lines
17 KiB
Python
Executable File
486 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BTC Accumulation Signal Optimizer -- Orchestrator
|
|
Coordinates the optimization loop across VPS, Windows PC (GPU), and Mac Mini (LLM).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
# Paths
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
DATA_DIR = os.path.join(BASE_DIR, "data")
|
|
CONFIG_DIR = os.path.join(BASE_DIR, "config")
|
|
RESULTS_DIR = os.path.join(BASE_DIR, "results")
|
|
ITERATIONS_LOG = os.path.join(RESULTS_DIR, "iterations.jsonl")
|
|
|
|
# Remote machines
|
|
WINDOWS_HOST = "bizzle@100.76.218.38"
|
|
WINDOWS_DIR = "btc-ml-optimizer"
|
|
MAC_MINI_HOST = "bizzle@bizzles-mac-mini-1"
|
|
|
|
# Convergence
|
|
MAX_ITERATIONS = 50
|
|
CONVERGENCE_WINDOW = 5
|
|
CONVERGENCE_THRESHOLD = 0.01 # 1% improvement
|
|
TARGET_COST_IMPROVEMENT = 20.0 # 20% cost basis improvement = exceptional
|
|
MIN_SIGNAL_COUNT = 30 # Minimum strong buy signals for valid results
|
|
ML_TIMEOUT = 600 # 10 minutes
|
|
|
|
# Colors
|
|
class C:
|
|
BOLD = "\033[1m"
|
|
GREEN = "\033[92m"
|
|
YELLOW = "\033[93m"
|
|
RED = "\033[91m"
|
|
CYAN = "\033[96m"
|
|
MAGENTA = "\033[95m"
|
|
DIM = "\033[2m"
|
|
RESET = "\033[0m"
|
|
|
|
|
|
def log(msg, color=""):
|
|
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
|
|
print(f"{C.DIM}[{ts}]{C.RESET} {color}{msg}{C.RESET}")
|
|
|
|
|
|
def run_cmd(cmd, timeout=120, check=True):
|
|
"""Run a shell command and return stdout."""
|
|
result = subprocess.run(
|
|
cmd, shell=True, capture_output=True, text=True, timeout=timeout
|
|
)
|
|
if check and result.returncode != 0:
|
|
raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}")
|
|
return result.stdout.strip()
|
|
|
|
|
|
def ensure_data():
|
|
"""Make sure BTC data is fetched."""
|
|
data_4h = os.path.join(DATA_DIR, "btc_4h.csv")
|
|
data_1h = os.path.join(DATA_DIR, "btc_1h.csv")
|
|
if os.path.exists(data_4h) and os.path.exists(data_1h):
|
|
log("Data files already exist", C.GREEN)
|
|
return
|
|
log("Fetching BTC data...", C.YELLOW)
|
|
run_cmd(f"cd {BASE_DIR} && python3 scripts/fetch_data.py", timeout=300)
|
|
|
|
|
|
def setup_windows_remote():
|
|
"""Ensure the remote directory exists on Windows."""
|
|
log("Ensuring Windows remote directory exists...", C.CYAN)
|
|
run_cmd(f'ssh {WINDOWS_HOST} "if not exist {WINDOWS_DIR} mkdir {WINDOWS_DIR}"', timeout=30)
|
|
|
|
|
|
def scp_to_windows(local_path, remote_name):
|
|
"""SCP a file to the Windows PC."""
|
|
run_cmd(f"scp -q {local_path} {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name}", timeout=60)
|
|
|
|
|
|
def scp_from_windows(remote_name, local_path):
|
|
"""SCP a file from the Windows PC."""
|
|
run_cmd(f"scp -q {WINDOWS_HOST}:{WINDOWS_DIR}/{remote_name} {local_path}", timeout=60)
|
|
|
|
|
|
def run_ml_training():
|
|
"""Run the ML engine on the Windows PC via SSH."""
|
|
cmd = (
|
|
f'ssh {WINDOWS_HOST} '
|
|
f'"cd {WINDOWS_DIR} && python train_and_backtest.py '
|
|
f'--config config.json --data btc_4h.csv --output results.json"'
|
|
)
|
|
log("Running ML training on Windows PC (GPU)...", C.MAGENTA)
|
|
result = subprocess.run(
|
|
cmd, shell=True, capture_output=True, text=True, timeout=ML_TIMEOUT
|
|
)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ML training failed:\n{result.stderr}\n{result.stdout}")
|
|
for line in result.stdout.strip().split("\n"):
|
|
log(f" {C.DIM}{line}", C.DIM)
|
|
return True
|
|
|
|
|
|
def load_iteration_history():
|
|
"""Load iteration history from JSONL log."""
|
|
history = []
|
|
if os.path.exists(ITERATIONS_LOG):
|
|
with open(ITERATIONS_LOG) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
history.append(json.loads(line))
|
|
return history
|
|
|
|
|
|
def save_iteration(iteration_data):
|
|
"""Append an iteration to the JSONL log."""
|
|
with open(ITERATIONS_LOG, "a") as f:
|
|
f.write(json.dumps(iteration_data) + "\n")
|
|
|
|
|
|
def check_convergence(history):
|
|
"""Check if optimization has converged."""
|
|
if len(history) < CONVERGENCE_WINDOW + 1:
|
|
return False, "Not enough iterations"
|
|
|
|
# Only consider valid results (enough signals)
|
|
valid = [h for h in history if h.get("signal_count", 0) >= MIN_SIGNAL_COUNT]
|
|
|
|
if not valid:
|
|
return False, "No valid results yet"
|
|
|
|
recent = history[-CONVERGENCE_WINDOW:]
|
|
scores = [h.get("cost_improvement", 0) for h in recent]
|
|
|
|
# Check if best score exceeds target
|
|
best_score = max(h.get("cost_improvement", 0) for h in valid)
|
|
if best_score >= TARGET_COST_IMPROVEMENT:
|
|
return True, f"Target cost improvement reached: {best_score:.1f}%"
|
|
|
|
# Check if improvement has stalled
|
|
best_recent = max(scores)
|
|
worst_recent = min(scores)
|
|
if best_recent > 0 and (best_recent - worst_recent) / best_recent < CONVERGENCE_THRESHOLD:
|
|
return True, f"Converged: variance < {CONVERGENCE_THRESHOLD*100}% over {CONVERGENCE_WINDOW} iterations"
|
|
|
|
return False, ""
|
|
|
|
|
|
def print_header():
|
|
print(f"""
|
|
{C.BOLD}{C.CYAN}========================================================
|
|
BTC Accumulation Signal Optimizer
|
|
VPS -> Windows GPU -> Mac Mini LLM -> Loop
|
|
========================================================{C.RESET}
|
|
""")
|
|
|
|
|
|
def print_results(results, iteration):
|
|
cost_imp = results.get("cost_basis_improvement_pct", 0)
|
|
color = C.GREEN if cost_imp > 15 else C.YELLOW if cost_imp > 10 else C.RED
|
|
print(f"""
|
|
{C.BOLD}--- Iteration {iteration} Results ---{C.RESET}
|
|
Cost Improvement: {color}{C.BOLD}{cost_imp:.1f}%{C.RESET}
|
|
Avg Cost (Model): ${results.get('avg_cost_basis_model', 0):,.2f}
|
|
Avg Cost (DCA): ${results.get('avg_cost_basis_dca', 0):,.2f}
|
|
Strong Signals: {results.get('strong_buy_signal_count', 0)}
|
|
Signal Frequency: {results.get('signal_frequency_pct', 0):.1f}%
|
|
Quality Score: {results.get('pct_quality_strong_buy', 0):.1%}
|
|
Model R2: {results.get('model_r2_score', 0):.4f}
|
|
Score@Bottoms: {results.get('avg_score_at_actual_bottoms', 0):.1f}
|
|
Score@Tops: {results.get('avg_score_at_actual_tops', 0):.1f}
|
|
Window Improvements: {results.get('per_window_cost_improvement', [])}
|
|
""")
|
|
|
|
|
|
def main():
|
|
print_header()
|
|
os.makedirs(RESULTS_DIR, exist_ok=True)
|
|
|
|
ensure_data()
|
|
|
|
config_path = os.path.join(CONFIG_DIR, "initial_config.json")
|
|
best_config_path = os.path.join(CONFIG_DIR, "best_config.json")
|
|
|
|
if os.path.exists(best_config_path):
|
|
log("Resuming from best_config.json", C.GREEN)
|
|
with open(best_config_path) as f:
|
|
config = json.load(f)
|
|
else:
|
|
with open(config_path) as f:
|
|
config = json.load(f)
|
|
|
|
history = load_iteration_history()
|
|
start_iter = len(history) + 1
|
|
best_score = max((h.get("cost_improvement", 0) for h in history), default=0)
|
|
|
|
log(f"Starting at iteration {start_iter}, best cost improvement so far: {best_score:.1f}%", C.BOLD)
|
|
|
|
setup_windows_remote()
|
|
|
|
log("Uploading ML engine to Windows...", C.CYAN)
|
|
scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py")
|
|
|
|
for tf in ["1h", "4h"]:
|
|
data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv")
|
|
if os.path.exists(data_file):
|
|
log(f"Uploading btc_{tf}.csv to Windows...", C.CYAN)
|
|
scp_to_windows(data_file, f"btc_{tf}.csv")
|
|
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "llm_client"))
|
|
from analyzer import analyze_and_suggest
|
|
|
|
for iteration in range(start_iter, MAX_ITERATIONS + 1):
|
|
log(f"\n{'='*50}", C.BOLD)
|
|
log(f"ITERATION {iteration}/{MAX_ITERATIONS}", f"{C.BOLD}{C.CYAN}")
|
|
log(f"Model: {config.get('model_type', 'unknown')}, "
|
|
f"LR: {config.get('hyperparameters', {}).get('learning_rate', '?')}, "
|
|
f"Depth: {config.get('hyperparameters', {}).get('max_depth', '?')}", C.DIM)
|
|
log(f"{'='*50}", C.BOLD)
|
|
|
|
tmp_config = os.path.join(BASE_DIR, "config", "current_config.json")
|
|
with open(tmp_config, "w") as f:
|
|
json.dump(config, f, indent=2)
|
|
scp_to_windows(tmp_config, "config.json")
|
|
|
|
try:
|
|
run_ml_training()
|
|
except (RuntimeError, subprocess.TimeoutExpired) as e:
|
|
log(f"ML training failed: {e}", C.RED)
|
|
log("Reverting to previous config and continuing...", C.YELLOW)
|
|
if history:
|
|
config = history[-1].get("config", config)
|
|
continue
|
|
|
|
results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json")
|
|
scp_from_windows("results.json", results_local)
|
|
|
|
with open(results_local) as f:
|
|
results = json.load(f)
|
|
|
|
print_results(results, iteration)
|
|
|
|
current_score = results.get("cost_basis_improvement_pct", 0)
|
|
signal_count = results.get("strong_buy_signal_count", 0)
|
|
is_best = current_score > best_score and signal_count >= MIN_SIGNAL_COUNT
|
|
|
|
if is_best:
|
|
best_score = current_score
|
|
with open(best_config_path, "w") as f:
|
|
json.dump(config, f, indent=2)
|
|
log(f"NEW BEST! Cost Improvement: {best_score:.1f}%", f"{C.BOLD}{C.GREEN}")
|
|
|
|
iter_data = {
|
|
"iteration": iteration,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"cost_improvement": current_score,
|
|
"avg_30d_return": results.get("avg_quality_score_strong_buy", 0),
|
|
"avg_90d_return": results.get("pct_quality_strong_buy", 0),
|
|
"signal_count": signal_count,
|
|
"signal_frequency": results.get("signal_frequency_pct", 0),
|
|
"r2_score": results.get("model_r2_score", 0),
|
|
"score_at_bottoms": results.get("avg_score_at_actual_bottoms", 0),
|
|
"score_at_tops": results.get("avg_score_at_actual_tops", 0),
|
|
"model_type": config.get("model_type", "unknown"),
|
|
"is_best": is_best,
|
|
"config": config,
|
|
"results": results,
|
|
}
|
|
save_iteration(iter_data)
|
|
history.append(iter_data)
|
|
|
|
converged, reason = check_convergence(history)
|
|
if converged:
|
|
log(f"\nOptimization converged: {reason}", f"{C.BOLD}{C.GREEN}")
|
|
break
|
|
|
|
if iteration >= MAX_ITERATIONS:
|
|
log(f"\nMax iterations ({MAX_ITERATIONS}) reached.", C.YELLOW)
|
|
break
|
|
|
|
log("\nConsulting LLM for strategy modifications...", C.MAGENTA)
|
|
try:
|
|
summary_history = [
|
|
{
|
|
"iteration": h["iteration"],
|
|
"cost_improvement": h.get("cost_improvement", 0),
|
|
"signal_count": h.get("signal_count", 0),
|
|
"r2_score": h.get("r2_score", 0),
|
|
"model_type": h.get("model_type", "unknown"),
|
|
}
|
|
for h in history
|
|
]
|
|
new_config, reasoning = analyze_and_suggest(config, results, summary_history)
|
|
log(f" LLM reasoning: {reasoning[:200]}...", C.DIM)
|
|
config = new_config
|
|
except Exception as e:
|
|
log(f"LLM call failed: {e}", C.RED)
|
|
log("Continuing with current config + random perturbation...", C.YELLOW)
|
|
import random
|
|
hp = config.get("hyperparameters", {})
|
|
hp["learning_rate"] = hp.get("learning_rate", 0.01) * random.uniform(0.8, 1.2)
|
|
hp["max_depth"] = max(3, min(10, hp.get("max_depth", 5) + random.choice([-1, 0, 1])))
|
|
config["hyperparameters"] = hp
|
|
|
|
print(f"""
|
|
{C.BOLD}{C.GREEN}========================================================
|
|
Optimization Complete!
|
|
========================================================{C.RESET}
|
|
|
|
Total Iterations: {len(history)}
|
|
Best Cost Improvement: {C.BOLD}{best_score:.1f}%{C.RESET}
|
|
Best Config: {best_config_path}
|
|
Iteration Log: {ITERATIONS_LOG}
|
|
""")
|
|
|
|
|
|
# --- Library API for dashboard integration ---
|
|
|
|
_stop_event = threading.Event()
|
|
_status = {
|
|
"state": "idle",
|
|
"iteration": 0,
|
|
"max_iterations": MAX_ITERATIONS,
|
|
"best_score": 0.0,
|
|
"error": None,
|
|
"llm_suggestions": [],
|
|
}
|
|
_status_lock = threading.Lock()
|
|
|
|
|
|
def get_status():
|
|
"""Get current optimization status (thread-safe)."""
|
|
with _status_lock:
|
|
return dict(_status)
|
|
|
|
|
|
def update_status(**kwargs):
|
|
"""Update status fields (thread-safe)."""
|
|
with _status_lock:
|
|
_status.update(kwargs)
|
|
|
|
|
|
def run_optimization_loop(callback=None, config_override=None):
|
|
"""Run the optimization loop from a background thread."""
|
|
_stop_event.clear()
|
|
update_status(state="running", iteration=0, error=None, best_score=0.0)
|
|
|
|
try:
|
|
os.makedirs(RESULTS_DIR, exist_ok=True)
|
|
ensure_data()
|
|
|
|
config_path = os.path.join(CONFIG_DIR, "initial_config.json")
|
|
best_config_path = os.path.join(CONFIG_DIR, "best_config.json")
|
|
|
|
if config_override:
|
|
config = config_override
|
|
elif os.path.exists(best_config_path):
|
|
with open(best_config_path) as f:
|
|
config = json.load(f)
|
|
else:
|
|
with open(config_path) as f:
|
|
config = json.load(f)
|
|
|
|
history = load_iteration_history()
|
|
start_iter = len(history) + 1
|
|
best_score = max((h.get("cost_improvement", 0) for h in history), default=0)
|
|
update_status(best_score=best_score)
|
|
|
|
setup_windows_remote()
|
|
scp_to_windows(os.path.join(BASE_DIR, "ml_engine", "train_and_backtest.py"), "train_and_backtest.py")
|
|
for tf in ["1h", "4h"]:
|
|
data_file = os.path.join(DATA_DIR, f"btc_{tf}.csv")
|
|
if os.path.exists(data_file):
|
|
scp_to_windows(data_file, f"btc_{tf}.csv")
|
|
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "llm_client"))
|
|
from analyzer import analyze_and_suggest
|
|
|
|
for iteration in range(start_iter, MAX_ITERATIONS + 1):
|
|
if _stop_event.is_set():
|
|
update_status(state="completed")
|
|
return
|
|
update_status(iteration=iteration)
|
|
|
|
tmp_config = os.path.join(BASE_DIR, "config", "current_config.json")
|
|
with open(tmp_config, "w") as f:
|
|
json.dump(config, f, indent=2)
|
|
scp_to_windows(tmp_config, "config.json")
|
|
|
|
try:
|
|
run_ml_training()
|
|
except (RuntimeError, subprocess.TimeoutExpired) as e:
|
|
if callback:
|
|
callback(iteration, {"error": str(e)})
|
|
if history:
|
|
config = history[-1].get("config", config)
|
|
continue
|
|
|
|
results_local = os.path.join(RESULTS_DIR, f"results_iter_{iteration}.json")
|
|
scp_from_windows("results.json", results_local)
|
|
with open(results_local) as f:
|
|
results = json.load(f)
|
|
|
|
current_score = results.get("cost_basis_improvement_pct", 0)
|
|
signal_count = results.get("strong_buy_signal_count", 0)
|
|
is_best = current_score > best_score and signal_count >= MIN_SIGNAL_COUNT
|
|
|
|
if is_best:
|
|
best_score = current_score
|
|
with open(best_config_path, "w") as f:
|
|
json.dump(config, f, indent=2)
|
|
update_status(best_score=best_score)
|
|
|
|
iter_data = {
|
|
"iteration": iteration,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"cost_improvement": current_score,
|
|
"signal_count": signal_count,
|
|
"signal_frequency": results.get("signal_frequency_pct", 0),
|
|
"r2_score": results.get("model_r2_score", 0),
|
|
"score_at_bottoms": results.get("avg_score_at_actual_bottoms", 0),
|
|
"score_at_tops": results.get("avg_score_at_actual_tops", 0),
|
|
"quality": results.get("pct_quality_strong_buy", 0),
|
|
"model_type": config.get("model_type", "unknown"),
|
|
"is_best": is_best,
|
|
"config": config,
|
|
"results": results,
|
|
}
|
|
save_iteration(iter_data)
|
|
history.append(iter_data)
|
|
|
|
if callback:
|
|
callback(iteration, iter_data)
|
|
|
|
converged, reason = check_convergence(history)
|
|
if converged:
|
|
update_status(state="completed")
|
|
return
|
|
|
|
if iteration >= MAX_ITERATIONS:
|
|
update_status(state="completed")
|
|
return
|
|
|
|
if _stop_event.is_set():
|
|
update_status(state="completed")
|
|
return
|
|
|
|
try:
|
|
summary_history = [
|
|
{k: h[k] for k in ("iteration", "cost_improvement", "signal_count", "r2_score", "model_type")
|
|
if k in h}
|
|
for h in history
|
|
]
|
|
new_config, reasoning = analyze_and_suggest(config, results, summary_history)
|
|
with _status_lock:
|
|
_status["llm_suggestions"].append({
|
|
"iteration": iteration,
|
|
"reasoning": reasoning,
|
|
})
|
|
config = new_config
|
|
except Exception:
|
|
import random
|
|
hp = config.get("hyperparameters", {})
|
|
hp["learning_rate"] = hp.get("learning_rate", 0.01) * random.uniform(0.8, 1.2)
|
|
hp["max_depth"] = max(3, min(10, hp.get("max_depth", 5) + random.choice([-1, 0, 1])))
|
|
config["hyperparameters"] = hp
|
|
|
|
update_status(state="completed")
|
|
|
|
except Exception as e:
|
|
update_status(state="error", error=str(e))
|
|
raise
|
|
|
|
|
|
def request_stop():
|
|
"""Request graceful stop of the optimization loop."""
|
|
_stop_event.set()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|