import argparse import asyncio import logging import os import sqlite3 import time from dataclasses import dataclass from datetime import datetime, timezone from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any from zoneinfo import ZoneInfo import aiohttp from dotenv import load_dotenv load_dotenv() BASE_URL = os.getenv("BINANCE_FAPI_BASE_URL", "https://fapi.binance.com") DB_PATH = Path(os.getenv("DB_PATH", "data/app.db")) INTERVAL = "4h" INTERVAL_MS = 4 * 60 * 60 * 1000 ATR_LENGTH = int(os.getenv("ATR_LENGTH", "14")) ATR_MULTIPLE = Decimal(os.getenv("ATR_MULTIPLE", "1.5")) CRYPTO_ATR_LENGTH = int(os.getenv("CRYPTO_ATR_LENGTH", str(ATR_LENGTH))) CRYPTO_ATR_MULTIPLE = Decimal(os.getenv("CRYPTO_ATR_MULTIPLE", str(ATR_MULTIPLE))) TRADFI_ATR_LENGTH = int(os.getenv("TRADFI_ATR_LENGTH", str(ATR_LENGTH))) TRADFI_ATR_MULTIPLE = Decimal(os.getenv("TRADFI_ATR_MULTIPLE", str(ATR_MULTIPLE))) INIT_KLINES_LIMIT = int(os.getenv("INIT_KLINES_LIMIT", "40")) CONCURRENCY = int(os.getenv("CONCURRENCY", "15")) HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "20")) MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) RATE_LIMIT_BACKOFF = int(os.getenv("RATE_LIMIT_BACKOFF", "30")) BODY_RATIO_FILTER_ENABLED = os.getenv("BODY_RATIO_FILTER_ENABLED", "false").lower() == "true" MIN_BODY_RATIO = Decimal(os.getenv("MIN_BODY_RATIO", "0.5")) CRYPTO_BODY_RATIO_FILTER_ENABLED = ( os.getenv("CRYPTO_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true" ) CRYPTO_MIN_BODY_RATIO = Decimal(os.getenv("CRYPTO_MIN_BODY_RATIO", str(MIN_BODY_RATIO))) TRADFI_BODY_RATIO_FILTER_ENABLED = ( os.getenv("TRADFI_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true" ) TRADFI_MIN_BODY_RATIO = Decimal(os.getenv("TRADFI_MIN_BODY_RATIO", str(MIN_BODY_RATIO))) KLINES_RETENTION_PER_SYMBOL = int(os.getenv("KLINES_RETENTION_PER_SYMBOL", "500")) SIGNAL_RETENTION_DAYS = int(os.getenv("SIGNAL_RETENTION_DAYS", "90")) DISCORD_ENABLED = os.getenv("DISCORD_ENABLED", "false").lower() == "true" DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "").strip() MARKET_CRYPTO = "CRYPTO" MARKET_TRADFI = "TRADFI" @dataclass(frozen=True) class MarketConfig: market_type: str label: str atr_length: int atr_multiple: Decimal body_ratio_filter_enabled: bool min_body_ratio: Decimal strict_time_grid: bool @dataclass(frozen=True) class ScanResult: market_type: str label: str status: str symbols_total: int symbols_failed: int updated_klines: int signals_created: int elapsed_seconds: float error_summary: str | None = None MARKET_CONFIGS = { MARKET_CRYPTO: MarketConfig( market_type=MARKET_CRYPTO, label="Binance USDT 永续", atr_length=CRYPTO_ATR_LENGTH, atr_multiple=CRYPTO_ATR_MULTIPLE, body_ratio_filter_enabled=CRYPTO_BODY_RATIO_FILTER_ENABLED, min_body_ratio=CRYPTO_MIN_BODY_RATIO, strict_time_grid=True, ), MARKET_TRADFI: MarketConfig( market_type=MARKET_TRADFI, label="Binance TradFi 永续", atr_length=TRADFI_ATR_LENGTH, atr_multiple=TRADFI_ATR_MULTIPLE, body_ratio_filter_enabled=TRADFI_BODY_RATIO_FILTER_ENABLED, min_body_ratio=TRADFI_MIN_BODY_RATIO, strict_time_grid=False, ), } logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s", ) logger = logging.getLogger("crypto-atr-scanner") def utc_now_ms() -> int: return int(datetime.now(timezone.utc).timestamp() * 1000) def latest_theoretical_closed_open_time(now_ms: int | None = None) -> int: now_ms = utc_now_ms() if now_ms is None else now_ms return (now_ms // INTERVAL_MS) * INTERVAL_MS - INTERVAL_MS def ms_to_iso(ms: int) -> str: return datetime.fromtimestamp(ms / 1000, timezone.utc).isoformat() def decimal_from_api(value: Any) -> Decimal: try: return Decimal(str(value)) except (InvalidOperation, ValueError) as exc: raise ValueError(f"Invalid decimal value from API: {value!r}") from exc def db_connect(path: Path | str = DB_PATH) -> sqlite3.Connection: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode = WAL;") conn.execute("PRAGMA busy_timeout = 5000;") return conn def init_db(conn: sqlite3.Connection) -> None: existing_symbols = conn.execute( "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'symbols'" ).fetchone() if existing_symbols: symbol_cols = [row["name"] for row in conn.execute("PRAGMA table_info(symbols)")] if "market_type" not in symbol_cols: migrate_v11_to_v12(conn) conn.executescript( """ CREATE TABLE IF NOT EXISTS symbols ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, status TEXT NOT NULL, underlying_type TEXT, underlying_subtype TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (market_type, symbol) ); CREATE TABLE IF NOT EXISTS klines ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, open_time INTEGER NOT NULL, open TEXT NOT NULL, high TEXT NOT NULL, low TEXT NOT NULL, close TEXT NOT NULL, close_time INTEGER NOT NULL, atr14 TEXT, PRIMARY KEY (market_type, symbol, open_time) ); CREATE TABLE IF NOT EXISTS signals ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, direction TEXT NOT NULL, multiple TEXT NOT NULL, range TEXT NOT NULL, atr14 TEXT NOT NULL, open_time INTEGER NOT NULL, created_at TEXT NOT NULL, PRIMARY KEY (market_type, symbol, open_time) ); CREATE INDEX IF NOT EXISTS idx_klines_symbol_time ON klines (market_type, symbol, open_time DESC); CREATE INDEX IF NOT EXISTS idx_signals_time ON signals (market_type, open_time DESC); CREATE TABLE IF NOT EXISTS scan_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, market_type TEXT NOT NULL, started_at TEXT NOT NULL, finished_at TEXT NOT NULL, status TEXT NOT NULL, symbols_total INTEGER NOT NULL DEFAULT 0, symbols_success INTEGER NOT NULL DEFAULT 0, symbols_failed INTEGER NOT NULL DEFAULT 0, updated_klines INTEGER NOT NULL DEFAULT 0, signals_created INTEGER NOT NULL DEFAULT 0, elapsed_seconds TEXT NOT NULL, latest_target_open_time INTEGER, error_summary TEXT ); CREATE INDEX IF NOT EXISTS idx_scan_runs_market_finished ON scan_runs (market_type, finished_at DESC); """ ) conn.commit() def migrate_v11_to_v12(conn: sqlite3.Connection) -> None: logger.info("Migrating database schema to v1.2 market_type layout") conn.executescript( """ ALTER TABLE symbols RENAME TO symbols_v11; ALTER TABLE klines RENAME TO klines_v11; ALTER TABLE signals RENAME TO signals_v11; CREATE TABLE symbols ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, status TEXT NOT NULL, underlying_type TEXT, underlying_subtype TEXT, updated_at TEXT NOT NULL, PRIMARY KEY (market_type, symbol) ); CREATE TABLE klines ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, open_time INTEGER NOT NULL, open TEXT NOT NULL, high TEXT NOT NULL, low TEXT NOT NULL, close TEXT NOT NULL, close_time INTEGER NOT NULL, atr14 TEXT, PRIMARY KEY (market_type, symbol, open_time) ); CREATE TABLE signals ( market_type TEXT NOT NULL DEFAULT 'CRYPTO', symbol TEXT NOT NULL, direction TEXT NOT NULL, multiple TEXT NOT NULL, range TEXT NOT NULL, atr14 TEXT NOT NULL, open_time INTEGER NOT NULL, created_at TEXT NOT NULL, PRIMARY KEY (market_type, symbol, open_time) ); INSERT INTO symbols (market_type, symbol, status, updated_at) SELECT 'CRYPTO', symbol, status, updated_at FROM symbols_v11; INSERT INTO klines (market_type, symbol, open_time, open, high, low, close, close_time, atr14) SELECT 'CRYPTO', symbol, open_time, open, high, low, close, close_time, atr14 FROM klines_v11; INSERT INTO signals (market_type, symbol, direction, multiple, range, atr14, open_time, created_at) SELECT 'CRYPTO', symbol, direction, multiple, range, atr14, open_time, created_at FROM signals_v11; DROP TABLE symbols_v11; DROP TABLE klines_v11; DROP TABLE signals_v11; """ ) conn.commit() async def fetch_json( session: aiohttp.ClientSession, path: str, params: dict[str, Any] | None = None, ) -> Any: url = f"{BASE_URL}{path}" last_error: Exception | None = None for attempt in range(1, MAX_RETRIES + 1): try: async with session.get(url, params=params) as response: response.raise_for_status() return await response.json() except aiohttp.ClientResponseError as exc: last_error = exc if exc.status in {418, 429}: retry_after = exc.headers.get("Retry-After") if exc.headers else None delay = int(retry_after) if retry_after and retry_after.isdigit() else RATE_LIMIT_BACKOFF logger.warning( "Rate limited attempt=%s status=%s path=%s sleep=%ss", attempt, exc.status, path, delay, ) else: delay = min(2**attempt, 10) logger.warning( "HTTP error attempt=%s status=%s path=%s error=%s", attempt, exc.status, path, exc, ) await asyncio.sleep(delay) except Exception as exc: # noqa: BLE001 - log and retry all transport/API errors. last_error = exc delay = min(2**attempt, 10) logger.warning("Request failed attempt=%s path=%s error=%s", attempt, path, exc) await asyncio.sleep(delay) raise RuntimeError(f"Request failed after {MAX_RETRIES} attempts: {url}") from last_error def normalize_subtype(value: Any) -> str | None: if value is None: return None if isinstance(value, list): return ",".join(str(item) for item in value) return str(value) async def fetch_symbols(session: aiohttp.ClientSession, market_type: str) -> list[dict[str, Any]]: exchange_info = await fetch_json(session, "/fapi/v1/exchangeInfo") symbols: list[dict[str, Any]] = [] for item in exchange_info.get("symbols", []): if item.get("status") != "TRADING" or item.get("quoteAsset") != "USDT": continue subtype = item.get("underlyingSubType") or [] is_tradfi = item.get("contractType") == "TRADIFI_PERPETUAL" or "TradFi" in subtype is_crypto = item.get("contractType") == "PERPETUAL" and not is_tradfi if market_type == MARKET_CRYPTO and not is_crypto: continue if market_type == MARKET_TRADFI and not is_tradfi: continue symbols.append( { "symbol": item["symbol"], "underlying_type": item.get("underlyingType"), "underlying_subtype": normalize_subtype(subtype), } ) return sorted(symbols, key=lambda row: row["symbol"]) async def fetch_klines( session: aiohttp.ClientSession, symbol: str, limit: int = 2, start_time: int | None = None, end_time: int | None = None, ) -> list[list[Any]]: params: dict[str, Any] = {"symbol": symbol, "interval": INTERVAL, "limit": limit} if start_time is not None: params["startTime"] = start_time if end_time is not None: params["endTime"] = end_time return await fetch_json(session, "/fapi/v1/klines", params) def closed_klines(raw_klines: list[list[Any]], now_ms: int) -> list[dict[str, Any]]: klines: list[dict[str, Any]] = [] for item in raw_klines: close_time = int(item[6]) if close_time > now_ms: continue klines.append( { "open_time": int(item[0]), "open": str(item[1]), "high": str(item[2]), "low": str(item[3]), "close": str(item[4]), "close_time": close_time, } ) return sorted(klines, key=lambda row: row["open_time"]) def upsert_symbols(conn: sqlite3.Connection, market_type: str, symbols: list[dict[str, Any]]) -> None: now = datetime.now(timezone.utc).isoformat() active = {row["symbol"] for row in symbols} for row in symbols: conn.execute( """ INSERT INTO symbols ( market_type, symbol, status, underlying_type, underlying_subtype, updated_at ) VALUES (?, ?, 'TRADING', ?, ?, ?) ON CONFLICT(market_type, symbol) DO UPDATE SET status = excluded.status, underlying_type = excluded.underlying_type, underlying_subtype = excluded.underlying_subtype, updated_at = excluded.updated_at """, ( market_type, row["symbol"], row.get("underlying_type"), row.get("underlying_subtype"), now, ), ) existing = conn.execute( "SELECT symbol FROM symbols WHERE market_type = ?", (market_type,), ).fetchall() for row in existing: if row["symbol"] not in active: conn.execute( """ UPDATE symbols SET status = 'INACTIVE', updated_at = ? WHERE market_type = ? AND symbol = ? """, (now, market_type, row["symbol"]), ) conn.commit() def latest_kline(conn: sqlite3.Connection, market_type: str, symbol: str) -> sqlite3.Row | None: return conn.execute( """ SELECT * FROM klines WHERE market_type = ? AND symbol = ? ORDER BY open_time DESC LIMIT 1 """, (market_type, symbol), ).fetchone() def active_symbols_from_db(conn: sqlite3.Connection, market_type: str) -> list[str]: rows = conn.execute( """ SELECT symbol FROM symbols WHERE market_type = ? AND status = 'TRADING' ORDER BY symbol ASC """, (market_type,), ).fetchall() return [row["symbol"] for row in rows] def insert_klines( conn: sqlite3.Connection, market_type: str, symbol: str, klines: list[dict[str, Any]], ) -> None: if not klines: return conn.executemany( """ INSERT INTO klines ( market_type, symbol, open_time, open, high, low, close, close_time, atr14 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL) ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET open = excluded.open, high = excluded.high, low = excluded.low, close = excluded.close, close_time = excluded.close_time """, [ ( market_type, symbol, row["open_time"], row["open"], row["high"], row["low"], row["close"], row["close_time"], ) for row in klines ], ) conn.commit() def recompute_atr_and_signals( conn: sqlite3.Connection, config: MarketConfig, symbol: str, candidate_open_times: set[int], ) -> int: rows = conn.execute( """ SELECT * FROM klines WHERE market_type = ? AND symbol = ? ORDER BY open_time ASC """, (config.market_type, symbol), ).fetchall() if not rows: return 0 prev_close: Decimal | None = None atr: Decimal | None = None tr_values: list[Decimal] = [] signal_count = 0 for row in rows: high = decimal_from_api(row["high"]) low = decimal_from_api(row["low"]) close = decimal_from_api(row["close"]) open_price = decimal_from_api(row["open"]) if prev_close is None: tr = high - low else: tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) tr_values.append(tr) if len(tr_values) == config.atr_length: atr = sum(tr_values) / Decimal(config.atr_length) elif len(tr_values) > config.atr_length and atr is not None: atr = ((atr * Decimal(config.atr_length - 1)) + tr) / Decimal(config.atr_length) atr_text = str(atr) if atr is not None else None conn.execute( """ UPDATE klines SET atr14 = ? WHERE market_type = ? AND symbol = ? AND open_time = ? """, (atr_text, config.market_type, symbol, row["open_time"]), ) if atr is not None and row["open_time"] in candidate_open_times: candle_range = high - low threshold = atr * config.atr_multiple if candle_range >= threshold and close != open_price: if config.body_ratio_filter_enabled: body_ratio = abs(close - open_price) / candle_range if candle_range else Decimal("0") if body_ratio < config.min_body_ratio: prev_close = close continue direction = "Bullish" if close > open_price else "Bearish" multiple = candle_range / atr conn.execute( """ INSERT INTO signals ( market_type, symbol, direction, multiple, range, atr14, open_time, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET direction = excluded.direction, multiple = excluded.multiple, range = excluded.range, atr14 = excluded.atr14, created_at = excluded.created_at """, ( config.market_type, symbol, direction, str(multiple), str(candle_range), str(atr), row["open_time"], datetime.now(timezone.utc).isoformat(), ), ) signal_count += 1 prev_close = close conn.commit() return signal_count def cleanup_old_data(conn: sqlite3.Connection) -> tuple[int, int]: deleted_klines = 0 deleted_signals = 0 if KLINES_RETENTION_PER_SYMBOL > 0: symbols = conn.execute("SELECT DISTINCT market_type, symbol FROM klines").fetchall() for row in symbols: cursor = conn.execute( """ DELETE FROM klines WHERE market_type = ? AND symbol = ? AND open_time NOT IN ( SELECT open_time FROM klines WHERE market_type = ? AND symbol = ? ORDER BY open_time DESC LIMIT ? ) """, ( row["market_type"], row["symbol"], row["market_type"], row["symbol"], KLINES_RETENTION_PER_SYMBOL, ), ) deleted_klines += cursor.rowcount if SIGNAL_RETENTION_DAYS > 0: cutoff = utc_now_ms() - (SIGNAL_RETENTION_DAYS * 24 * 60 * 60 * 1000) cursor = conn.execute("DELETE FROM signals WHERE open_time < ?", (cutoff,)) deleted_signals = cursor.rowcount conn.commit() return deleted_klines, deleted_signals def insert_scan_run( conn: sqlite3.Connection, market_type: str, started_at: str, status: str, symbols_total: int, symbols_success: int, symbols_failed: int, updated_klines: int, signals_created: int, elapsed_seconds: float, latest_target_open_time: int | None, error_summary: str | None = None, ) -> None: conn.execute( """ INSERT INTO scan_runs ( market_type, started_at, finished_at, status, symbols_total, symbols_success, symbols_failed, updated_klines, signals_created, elapsed_seconds, latest_target_open_time, error_summary ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( market_type, started_at, datetime.now(timezone.utc).isoformat(), status, symbols_total, symbols_success, symbols_failed, updated_klines, signals_created, f"{elapsed_seconds:.1f}", latest_target_open_time, error_summary, ), ) conn.commit() async def fetch_missing_closed_klines( session: aiohttp.ClientSession, symbol: str, start_open_time: int, expected_open_time: int, now_ms: int, strict_time_grid: bool, ) -> list[dict[str, Any]]: if start_open_time > expected_open_time: return [] needed = ((expected_open_time - start_open_time) // INTERVAL_MS) + 1 raw = await fetch_klines( session, symbol, limit=max(2, min(1500, needed + 2)), start_time=start_open_time, end_time=expected_open_time + INTERVAL_MS - 1, ) rows = closed_klines(raw, now_ms) by_open = {row["open_time"]: row for row in rows} missing: list[dict[str, Any]] = [] missed_slots = 0 cursor = start_open_time while cursor <= expected_open_time: row = by_open.get(cursor) if row is not None: missing.append(row) else: missed_slots += 1 cursor += INTERVAL_MS if strict_time_grid and missed_slots: logger.warning( "%s missing %s expected 4H slots between %s and %s", symbol, missed_slots, ms_to_iso(start_open_time), ms_to_iso(expected_open_time), ) return missing async def process_symbol( session: aiohttp.ClientSession, conn: sqlite3.Connection, config: MarketConfig, symbol: str, semaphore: asyncio.Semaphore, now_ms: int, expected_open_time: int, ) -> tuple[str, int, int]: async with semaphore: latest = latest_kline(conn, config.market_type, symbol) new_rows: list[dict[str, Any]] if latest is None: raw = await fetch_klines(session, symbol, limit=INIT_KLINES_LIMIT + 1) new_rows = closed_klines(raw, now_ms)[-INIT_KLINES_LIMIT:] if config.strict_time_grid: candidate_open_times = {expected_open_time} else: candidate_open_times = {new_rows[-1]["open_time"]} if new_rows else set() else: latest_open_time = int(latest["open_time"]) if latest_open_time >= expected_open_time: raw = await fetch_klines(session, symbol, limit=2) rows = closed_klines(raw, now_ms) new_rows = [row for row in rows if row["open_time"] > latest_open_time] candidate_open_times = {row["open_time"] for row in new_rows} else: next_expected = latest_open_time + INTERVAL_MS new_rows = await fetch_missing_closed_klines( session, symbol, next_expected, expected_open_time, now_ms, config.strict_time_grid, ) candidate_open_times = {row["open_time"] for row in new_rows} insert_klines(conn, config.market_type, symbol, new_rows) signal_count = recompute_atr_and_signals( conn, config, symbol, candidate_open_times, ) return symbol, len(new_rows), signal_count async def process_symbol_safe( session: aiohttp.ClientSession, conn: sqlite3.Connection, config: MarketConfig, symbol: str, semaphore: asyncio.Semaphore, now_ms: int, expected_open_time: int, ) -> tuple[str, int, int, bool]: try: symbol, inserted, signal_count = await process_symbol( session, conn, config, symbol, semaphore, now_ms, expected_open_time, ) return symbol, inserted, signal_count, True except Exception as exc: # noqa: BLE001 - one failed symbol must not block the scan. logger.warning("%s skipped error=%s", symbol, exc) return symbol, 0, 0, False async def run_scan(market_type: str = MARKET_CRYPTO) -> ScanResult: config = MARKET_CONFIGS[market_type] started_perf = time.perf_counter() started_at = datetime.now(timezone.utc).isoformat() conn = db_connect() init_db(conn) symbols: list[str] = [] expected_open_time: int | None = None timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT) async with aiohttp.ClientSession(timeout=timeout) as session: try: symbol_rows = await fetch_symbols(session, market_type) upsert_symbols(conn, market_type, symbol_rows) symbols = [row["symbol"] for row in symbol_rows] except Exception as exc: # noqa: BLE001 - fallback keeps cron useful during API hiccups. symbols = active_symbols_from_db(conn, market_type) if not symbols: elapsed = time.perf_counter() - started_perf insert_scan_run( conn, market_type=market_type, started_at=started_at, status="failed", symbols_total=0, symbols_success=0, symbols_failed=0, updated_klines=0, signals_created=0, elapsed_seconds=elapsed, latest_target_open_time=None, error_summary=f"symbol refresh failed and no cached symbols: {exc}", ) logger.error( "Scan aborted: symbol refresh failed and database has no cached symbols elapsed=%.1fs error=%s", elapsed, exc, ) raise logger.warning( "Symbol refresh failed, using %s cached symbols error=%s", len(symbols), exc, ) now_ms = utc_now_ms() expected_open_time = latest_theoretical_closed_open_time(now_ms) semaphore = asyncio.Semaphore(CONCURRENCY) logger.info( "Scanning market=%s symbols=%s latest_closed_open=%s atr_length=%s atr_multiple=%s strict_time_grid=%s", market_type, len(symbols), ms_to_iso(expected_open_time), config.atr_length, config.atr_multiple, config.strict_time_grid, ) tasks = [ process_symbol_safe(session, conn, config, symbol, semaphore, now_ms, expected_open_time) for symbol in symbols ] updated = 0 signals = 0 failures = 0 success = 0 for task in asyncio.as_completed(tasks): symbol, inserted, signal_count, ok = await task if not ok: failures += 1 continue success += 1 updated += inserted signals += signal_count if inserted: logger.info("%s updated_klines=%s signals=%s", symbol, inserted, signal_count) elapsed = time.perf_counter() - started_perf status = "success" if failures == 0 else "partial" insert_scan_run( conn, market_type=market_type, started_at=started_at, status=status, symbols_total=len(symbols), symbols_success=success, symbols_failed=failures, updated_klines=updated, signals_created=signals, elapsed_seconds=elapsed, latest_target_open_time=expected_open_time, ) conn.close() speed = len(symbols) / elapsed if elapsed > 0 else 0 logger.info( "Scan done market=%s symbols=%s updated_klines=%s signals=%s failures=%s elapsed=%.1fs speed=%.1f_symbols/s", market_type, len(symbols), updated, signals, failures, elapsed, speed, ) logger.info("%.1f seconds scan complete", elapsed) cleanup_conn = db_connect() deleted_klines, deleted_signals = cleanup_old_data(cleanup_conn) cleanup_conn.close() if deleted_klines or deleted_signals: logger.info( "Cleanup done deleted_klines=%s deleted_signals=%s", deleted_klines, deleted_signals, ) return ScanResult( market_type=market_type, label=config.label, status=status, symbols_total=len(symbols), symbols_failed=failures, updated_klines=updated, signals_created=signals, elapsed_seconds=elapsed, ) async def send_discord_summary(results: list[ScanResult], elapsed_seconds: float) -> None: if not DISCORD_ENABLED: return if not DISCORD_WEBHOOK_URL: logger.warning("Discord notification skipped: DISCORD_WEBHOOK_URL is empty") return madrid_time = datetime.now(timezone.utc).astimezone(ZoneInfo("Europe/Madrid")) status_labels = {"success": "成功", "partial": "部分失败", "failed": "失败"} lines = ["**Crypto ATR Signal · 4H 扫描完成**", ""] for result in results: market_name = "Crypto" if result.market_type == MARKET_CRYPTO else "TradFi" status = status_labels.get(result.status, result.status) lines.append( f"{market_name}:{result.signals_created} 个信号 · {status} · " f"失败 {result.symbols_failed}" ) lines.extend( [ f"合计:{sum(result.signals_created for result in results)} 个信号", f"耗时:{elapsed_seconds:.1f} 秒", f"时间:{madrid_time.strftime('%Y-%m-%d %H:%M')} 马德里", ] ) payload = {"content": "\n".join(lines), "allowed_mentions": {"parse": []}} timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(DISCORD_WEBHOOK_URL, json=payload) as response: if response.status not in {200, 204}: body = await response.text() logger.warning( "Discord notification failed status=%s body=%s", response.status, body[:300], ) return logger.info("Discord aggregate notification sent") except Exception as exc: # noqa: BLE001 - notification must never fail the scan. logger.warning("Discord notification skipped error=%s", exc) async def run_all_markets() -> list[ScanResult]: started = time.perf_counter() results: list[ScanResult] = [] for market_type in (MARKET_CRYPTO, MARKET_TRADFI): try: results.append(await run_scan(market_type)) except Exception as exc: # noqa: BLE001 - one market must not block the other. config = MARKET_CONFIGS[market_type] logger.error("Market scan failed market=%s error=%s", market_type, exc) results.append( ScanResult( market_type=market_type, label=config.label, status="failed", symbols_total=0, symbols_failed=0, updated_klines=0, signals_created=0, elapsed_seconds=0, error_summary=str(exc), ) ) await send_discord_summary(results, time.perf_counter() - started) return results if __name__ == "__main__": parser = argparse.ArgumentParser(description="Scan Binance ATR signals") parser.add_argument( "--market", choices=["crypto", "tradfi", "all"], default="crypto", help="Market to scan: crypto, tradfi, or all", ) args = parser.parse_args() if args.market == "all": asyncio.run(run_all_markets()) else: selected_market = MARKET_TRADFI if args.market == "tradfi" else MARKET_CRYPTO asyncio.run(run_scan(selected_market))